diff options
author | Ralph Straubs <rs8887@att.com> | 2017-04-21 04:23:55 -0500 |
---|---|---|
committer | Ralph Straubs <rs8887@att.com> | 2017-04-21 05:55:16 -0500 |
commit | 5bd9ff4130799247be3be88fc02f3ce0efd014c0 (patch) | |
tree | 327ed17fae044265b08c419913290e2f9bcddcec /policy-endpoints/src/main | |
parent | 8f7e5de17521836a96fa1c771a1651f63216675c (diff) |
Batch submit -- force rebuild
[ECOMPD2TD-1126 1707] Delay the start of the election handler by one
more update interval to allow updatedDate to be set and recoginized by
other nodes.
[ECOMPD2TD-1073 1707] Remove pessimistic locks from DB access to prevent
DB deadlock
[US867397-policy-management-feature-api] Split original FeatureAPI into
three service interfaces.
These are:
- the original FeatureAPI for core
- PolicyControllerFeatureAPI (additional interface methods yet to
come ..)
- PolicyEngineFeatureAPI (additional interface methods yet to come ..)
Additional extensions will be done in the future.
This is to have equivalent functionality with what it is already there.
[US867397-policy-management-feature-api] Split original FeatureAPI into
three service interfaces.
These are:
- the original FeatureAPI for core
- PolicyControllerFeatureAPI (additional interface methods yet to
come ..)
- PolicyEngineFeatureAPI (additional interface methods yet to come ..)
Additional extensions will be done in the future.
This is to have equivalent functionality with what it is already there.
[ECOMPD2TD-1167] Avoid redundant dependencies and remove obsolete sql
scripts
[ECOMPD2TD-848] Fix copied from ATT droolspdp repo -- add timestamp to
metric and audit logs
[ECOMPD2TD-1167] update jetty version to 9.3.14
[ECOMPD2TD-1167] pull in policy-core jar as a dependency of
policy-management, instead of zip file
[ECOMPD2TD-1159] Add property to ignore repository audit errors
The property is called 'repository.audit.ignore.errors' -- it resides in
'IntegrityMonitor.properties', and it defaults to 'true'. When true, any
errors that occur in the repository audit are logged, but not reported
to integrity monitor.
[US867397] additional hooks for policy engine, and support operation
take over by feature bypassing further feature processing
[US867397] additional hooks for policy engine, and support operation
take over by feature bypassing further feature processing
[US867397] fix reverse return javadoc comment
[ECOMPD2TD-1192] Select correct persistent session information
This is a modification to 'DroolsPdpsElectionHandler.DesignatedWaiter'.
When going active, the list of persistent sessions needs to come from
the most recent active DroolsPDP. The most recent active DroolsPDP is
tracked in a local variable 'mostRecentPrimary', and the associated
list of sessions needs to be extracted just prior to going active.
The problem was that the list of sessions was extracted after the
current DroolsPDP was set to 'mostRecentPrimary'.
[ECOMPD2TD-1073 1707] Remove pessimistic locks from DB access to
prevent DB deadlock
[US860546] Add workaround for missing logger in message router
[US860546] add DME2 properties to PolicyProperties
[US860546] add DME2 support, pending unit testing
[US860546] updated unit tests for dmaap/DME2
[US860546] removed System.out.println lines from BusConsumer and
BusPublisher
[US860546] modifications and additions to TopicEndpoint tests
[US860546] remove block to append /events to serviceName in
BusConsumer, BusPublisher
[US860546] removing dmaap unit tests with ATT references
[ECOMPD2TD-1167] Audit release repository by default
[ECOMPD2TD-000] Cherry-pick versioning fixes from 1707.39.1 release
branch
[ECOMPD2TD-000] Fix policy-endpoints cambriaClient version
[US865296] ECOMP Policy Logging Compliance, remove redundant timestamp
from metric and audit log for logging compliance
[US865296] ECOMP Policy Logging Compliance, remove TargetVirtualEntity
from audit log, provide name to un-used column
[US865296] ECOMP Policy Logging Compliance, remove Unused from
AuditLogPattern, leave the column blank means un-used.
[US000000] Cleanup and performance improvements
1) Changed 'OrderedServiceImpl' to ensure that the same service instance
is used in every queue. This gets around the 'ServiceLoader' behavior
that generates a new instance for each interface the service
implements.
2) In 'PolicySession': add adaptive poll times within Drools sessions.
Instead of always polling every 5 seconds, poll frequency increases
when rules fire, and decreases when they don't. At present, the
poll delay ranges from 100 milliseconds to 5 seconds, but at some
point, it would be nice to make this configurable.
3) In 'PolicySession': add tests for 'logger.isDebugEnabled()' in
session listener methods -- this results in a significant amount of
real-time savings under load.
[ECOMPD2TD-1126 1707] Changes to the election handler to deal with
situations (race conditions) which can affect the site choice
[ECOMPD2TD-1126 1707] Clean up of site afinity logic. Minor changes
and a fix of the case where all pdpd are designated or all are
hot standby.
[ECOMPD2TD-1126 1707] Fixed issue with election handler and got all
StandbyStateManagmentTest JUnits working.
[ECOMPD2TD-1126 1707] Hopefully final changes to
DroolsPdpsElectionHandler and StateManagementTest. Also removed the
ResiliencyTestCases since they were redundant.
[ECOMPD2TD-1126 1707] Found a minor bug in the computeMostRecentPrimary
algorithm which I corrected and added a JUnit to confirm.
[ECOMPD2TD-1126 1707] Removed code that was commented out of election
handler to improve reability. Added an explanatory note to
StandbyStateManagementTest.
[US869183] Convert integrity monitor test endpoint to use
HttpServletServer
[US869183] add swagger-maven-plugin, update swagger annotation for
IM test interface
[US860371] Modified existing Dmaap and Ueb code to allow for https
connectivity. HTTP/HTTPS is configurable via controller.properties file
[US860520] Removed extra useHttps() method call from BusConsumer class
where the consumer should be using http. Also, restored pom.xml to
version at time feature branch was created.
[US860520] Removed TopicEndpointTest.java.
Added the ability to configure use of self-signed certificates for
consumer topics. Functionality cannot be added for publisher at
this time.
Removed useHttps parameter field from multiple .builder methods to
allow for greater ability to build source/sink objects without
specifying http vs https connectivity.
Other code cleanup and comments
[US860520] -Deleted sys.out statements used in testing from
BusConsumer/BusPublisher
-Moved setProps method outside of if/else stub in BusConsumer
-Could not add useHttps or selfSignedCerts to toString methods in
BusConsumer
-Move setProperty(contenttype, app/json) call outside of if/else
branch in BusPublisher
-Renamed doesAllowSelfSignedCerts method to is isAllowSelfSignedCerts
and moved to BusTopicBase
-Moved allowSelfSignedCerts member variable to BusTopicBase class.
BusTopicBase is ancestor of
SingleThreadedDmapp*/SingleThreadedUeb*/SingleThreadedBus* classes
therefore, it will be inherited.
-This changed resulted in cascading changes to the subclasses of
BusTopicbase
-Default is now set to disallow self-signed certs
-Added several javadoc param annotations
[US869183 1707] Changes to satisfy dependencies and correct pom syntax
[ECOMPD2TD-1280] move DroolsPDPIntegrityMonitor.RestManager to its
own class
[ECOMPD2TD-1278] Resolve duplicate dependency issues
(cherry-pick from 1707.40.1)
[ECOMPD2TD-1278] revert cambriaClient back to 0.0.1
[ECOMPD2TD-1278] put dmaap version back to 0.2.12
[ECOMPD2TD-1280 1707] Fix Commons-lang3 version and
common-modules version
Conflicts:
policy-endpoints/pom.xml
policy-healthcheck/pom.xml
policy-persistence/pom.xml
pom.xml
Change-Id: I803554bc64c55d2e82a3d6ad9120757f287144fb
Signed-off-by: Ralph Straubs <rs8887@att.com>
Diffstat (limited to 'policy-endpoints/src/main')
14 files changed, 1177 insertions, 128 deletions
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java index 7a2e9711..a3c2230d 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java @@ -31,12 +31,7 @@ public interface TopicListener { * @param commType communication infrastructure type * @param topic topic name * @param event event message as a string - * - * @return boolean. True if the invoking event dispatcher should continue - * dispatching the event to subsequent listeners. False if it is requested - * to the invoking event dispatcher to stop dispatching the same event to - * other listeners of less priority. This mechanism is generally not used. */ - public boolean onTopicEvent(Topic.CommInfrastructure commType, String topic, String event); + public void onTopicEvent(Topic.CommInfrastructure commType, String topic, String event); } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java index 5b4cfd42..c3d02d14 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java @@ -24,17 +24,64 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; -import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink; import org.openecomp.policy.common.logging.flexlogger.FlexLogger; import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink; import org.openecomp.policy.drools.properties.PolicyProperties; /** * DMAAP Topic Sink Factory */ public interface DmaapTopicSinkFactory { + public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; + public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; + public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; + public final String DME2_VERSION_PROPERTY = "Version"; + public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; + public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; + public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; + public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; + + /** + * Instantiates a new DMAAP Topic Sink + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName AAF user name + * @param password AAF password + * @param partitionKey Consumer Group + * @param environment DME2 environment + * @param aftEnvironment DME2 AFT environment + * @param partner DME2 Partner + * @param latitude DME2 latitude + * @param longitude DME2 longitude + * @param additionalProps additional properties to pass to DME2 + * @param managed is this sink endpoint managed? + * + * @return an DMAAP Topic Sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSink build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String userName, + String password, + String partitionKey, + String environment, + String aftEnvironment, + String partner, + String latitude, + String longitude, + Map<String,String> additionalProps, + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) ; /** * Instantiates a new DMAAP Topic Sink @@ -58,7 +105,9 @@ public interface DmaapTopicSinkFactory { String userName, String password, String partitionKey, - boolean managed) + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException; /** @@ -142,7 +191,15 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { String userName, String password, String partitionKey, - boolean managed) + String environment, + String aftEnvironment, + String partner, + String latitude, + String longitude, + Map<String,String> additionalProps, + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { if (topic == null || topic.isEmpty()) { @@ -158,7 +215,45 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { new InlineDmaapTopicSink(servers, topic, apiKey, apiSecret, userName, password, - partitionKey); + partitionKey, + environment, aftEnvironment, + partner, latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts); + + if (managed) + dmaapTopicWriters.put(topic, dmaapTopicSink); + return dmaapTopicSink; + } + } + + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSink build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String userName, + String password, + String partitionKey, + boolean managed, + boolean useHttps, boolean allowSelfSignedCerts) + throws IllegalArgumentException { + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized (this) { + if (dmaapTopicWriters.containsKey(topic)) { + return dmaapTopicWriters.get(topic); + } + + DmaapTopicSink dmaapTopicSink = + new InlineDmaapTopicSink(servers, topic, + apiKey, apiSecret, + userName, password, + partitionKey, useHttps, allowSelfSignedCerts); if (managed) dmaapTopicWriters.put(topic, dmaapTopicSink); @@ -172,7 +267,7 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { */ @Override public DmaapTopicSink build(List<String> servers, String topic) throws IllegalArgumentException { - return this.build(servers, topic, null, null, null, null, null, true); + return this.build(servers, topic, null, null, null, null, null, true, false, false); } @@ -196,12 +291,10 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (servers == null || servers.isEmpty()) { - logger.error("No DMAAP servers provided in " + properties); - continue; - } - List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*"))); + List<String> serverList; + if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*"))); + else serverList = new ArrayList<>(); String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + @@ -223,14 +316,100 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_MANAGED_SUFFIX); + + /* DME2 Properties */ + + String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + + String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + + String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); + + String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); + + String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + + String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + + String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); + + String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); + + String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); + + String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); + + String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); + + String dme2SessionStickinessRequired = properties + .getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); + + Map<String,String> dme2AdditionalProps = new HashMap<>(); + + if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) + dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); + if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) + dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); + if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) + dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); + if (dme2Version != null && !dme2Version.isEmpty()) + dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); + if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) + dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) + dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); + if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) + dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); + + if (servers == null || servers.isEmpty()) { + logger.error("No DMaaP servers or DME2 ServiceName provided"); + continue; + } + boolean managed = true; if (managedString != null && !managedString.isEmpty()) { managed = Boolean.parseBoolean(managedString); } + String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + //default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()){ + useHttps = Boolean.parseBoolean(useHttpsString); + } + + + String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + //default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){ + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, - apiKey, apiSecret, aafMechId, aafPassword, - partitionKey, managed); + apiKey, apiSecret, + aafMechId, aafPassword, + partitionKey, + dme2Environment, dme2AftEnvironment, + dme2Partner, dme2Latitude, dme2Longitude, + dme2AdditionalProps, managed, useHttps, allowSelfSignedCerts); + dmaapTopicWriters.add(dmaapTopicSink); } return dmaapTopicWriters; @@ -305,4 +484,4 @@ class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory { return writers; } -} +}
\ No newline at end of file diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java index f8d85eb7..9f60556c 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java @@ -24,16 +24,26 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource; import org.openecomp.policy.common.logging.flexlogger.FlexLogger; import org.openecomp.policy.common.logging.flexlogger.Logger; import org.openecomp.policy.drools.properties.PolicyProperties; + /** * DMAAP Topic Source Factory */ public interface DmaapTopicSourceFactory { + public final String DME2_READ_TIMEOUT_PROPERTY = "AFT_DME2_EP_READ_TIMEOUT_MS"; + public final String DME2_EP_CONN_TIMEOUT_PROPERTY = "AFT_DME2_EP_CONN_TIMEOUT"; + public final String DME2_ROUNDTRIP_TIMEOUT_PROPERTY = "AFT_DME2_ROUNDTRIP_TIMEOUT_MS"; + public final String DME2_VERSION_PROPERTY = "Version"; + public final String DME2_ROUTE_OFFER_PROPERTY = "routeOffer"; + public final String DME2_SERVICE_NAME_PROPERTY = "ServiceName"; + public final String DME2_SUBCONTEXT_PATH_PROPERTY = "SubContextPath"; + public final String DME2_SESSION_STICKINESS_REQUIRED_PROPERTY = "sessionstickinessrequired"; /** * Creates an DMAAP Topic Source based on properties files @@ -60,6 +70,8 @@ public interface DmaapTopicSourceFactory { * @param fetchTimeout Read Fetch Timeout * @param fetchLimit Fetch Limit * @param managed is this endpoind managed? + * @param useHttps does the connection use HTTPS? + * @param allowSelfSignedCerts does connection allow self-signed certificates? * * @return an DMAAP Topic Source * @throws IllegalArgumentException if invalid parameters are present @@ -74,7 +86,56 @@ public interface DmaapTopicSourceFactory { String consumerInstance, int fetchTimeout, int fetchLimit, - boolean managed) + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) + throws IllegalArgumentException; + + /** + * Instantiates a new DMAAP Topic Source + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param userName user name + * @param password password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Read Fetch Timeout + * @param fetchLimit Fetch Limit + * @param environment DME2 environment + * @param aftEnvironment DME2 AFT environment + * @param partner DME2 Partner + * @param latitude DME2 latitude + * @param longitude DME2 longitude + * @param additionalProps additional properties to pass to DME2 + * @param managed is this endpoind managed? + * @param useHttps does the connection use HTTPS? + * @param allowSelfSignedCerts does connection allow self-signed certificates? + * + * @return an DMAAP Topic Source + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String userName, + String password, + String consumerGroup, + String consumerInstance, + int fetchTimeout, + int fetchLimit, + String environment, + String aftEnvironment, + String partner, + String latitude, + String longitude, + Map<String,String> additionalProps, + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException; /** @@ -104,7 +165,7 @@ public interface DmaapTopicSourceFactory { * @return an DMAAP Topic Source * @throws IllegalArgumentException if invalid parameters are present */ - public DmaapTopicSource build(List<String> servers, + public DmaapTopicSource build(List<String> servers, String topic) throws IllegalArgumentException; @@ -154,7 +215,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { */ protected HashMap<String, DmaapTopicSource> dmaapTopicSources = new HashMap<String, DmaapTopicSource>(); - + /** * {@inheritDoc} */ @@ -169,7 +230,15 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { String consumerInstance, int fetchTimeout, int fetchLimit, - boolean managed) + String environment, + String aftEnvironment, + String partner, + String latitude, + String longitude, + Map<String,String> additionalProps, + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { if (topic == null || topic.isEmpty()) { @@ -185,7 +254,53 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { new SingleThreadedDmaapTopicSource(servers, topic, apiKey, apiSecret, userName, password, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + fetchTimeout, fetchLimit, + environment, aftEnvironment, partner, + latitude, longitude, additionalProps, useHttps, allowSelfSignedCerts); + + if (managed) + dmaapTopicSources.put(topic, dmaapTopicSource); + + return dmaapTopicSource; + } + } + /** + * {@inheritDoc} + */ + @Override + public DmaapTopicSource build(List<String> servers, + String topic, + String apiKey, + String apiSecret, + String userName, + String password, + String consumerGroup, + String consumerInstance, + int fetchTimeout, + int fetchLimit, + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) + throws IllegalArgumentException { + + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("DMaaP Server(s) must be provided"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized(this) { + if (dmaapTopicSources.containsKey(topic)) { + return dmaapTopicSources.get(topic); + } + + DmaapTopicSource dmaapTopicSource = + new SingleThreadedDmaapTopicSource(servers, topic, + apiKey, apiSecret, userName, password, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts); if (managed) dmaapTopicSources.put(topic, dmaapTopicSource); @@ -216,12 +331,9 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - if (servers == null || servers.isEmpty()) { - logger.error("No UEB servers provided in " + properties); - continue; - } - - List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*"))); + List<String> serverList; + if (servers != null && !servers.isEmpty()) serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*"))); + else serverList = new ArrayList<>(); String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + @@ -250,6 +362,70 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX); + + /* DME2 Properties */ + + String dme2Environment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX); + + String dme2AftEnvironment = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX); + + String dme2Partner = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX); + + String dme2RouteOffer = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX); + + String dme2Latitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX); + + String dme2Longitude = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX); + + String dme2EpReadTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_READ_TIMEOUT_MS_SUFFIX); + + String dme2EpConnTimeout = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_EP_CONN_TIMEOUT_SUFFIX); + + String dme2RoundtripTimeoutMs = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUNDTRIP_TIMEOUT_MS_SUFFIX); + + String dme2Version = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_VERSION_SUFFIX); + + String dme2SubContextPath = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + + topic + PolicyProperties.PROPERTY_DMAAP_DME2_SUB_CONTEXT_PATH_SUFFIX); + + String dme2SessionStickinessRequired = properties + .getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_SESSION_STICKINESS_REQUIRED_SUFFIX); + + Map<String,String> dme2AdditionalProps = new HashMap<>(); + + if (dme2EpReadTimeoutMs != null && !dme2EpReadTimeoutMs.isEmpty()) + dme2AdditionalProps.put(DME2_READ_TIMEOUT_PROPERTY, dme2EpReadTimeoutMs); + if (dme2EpConnTimeout != null && !dme2EpConnTimeout.isEmpty()) + dme2AdditionalProps.put(DME2_EP_CONN_TIMEOUT_PROPERTY, dme2EpConnTimeout); + if (dme2RoundtripTimeoutMs != null && !dme2RoundtripTimeoutMs.isEmpty()) + dme2AdditionalProps.put(DME2_ROUNDTRIP_TIMEOUT_PROPERTY, dme2RoundtripTimeoutMs); + if (dme2Version != null && !dme2Version.isEmpty()) + dme2AdditionalProps.put(DME2_VERSION_PROPERTY, dme2Version); + if (dme2RouteOffer != null && !dme2RouteOffer.isEmpty()) + dme2AdditionalProps.put(DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + if (dme2SubContextPath != null && !dme2SubContextPath.isEmpty()) + dme2AdditionalProps.put(DME2_SUBCONTEXT_PATH_PROPERTY, dme2SubContextPath); + if (dme2SessionStickinessRequired != null && !dme2SessionStickinessRequired.isEmpty()) + dme2AdditionalProps.put(DME2_SESSION_STICKINESS_REQUIRED_PROPERTY, dme2SessionStickinessRequired); + + + if (servers == null || servers.isEmpty()) { + + logger.error("No DMaaP servers or DME2 ServiceName provided"); + continue; + } + int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH; if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) { try { @@ -279,10 +455,33 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { managed = Boolean.parseBoolean(managedString); } + String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + //default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()){ + useHttps = Boolean.parseBoolean(useHttpsString); + } + + String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + //default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){ + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + + DmaapTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, aafMechId, aafPassword, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, managed); + fetchTimeout, fetchLimit, + dme2Environment, dme2AftEnvironment, dme2Partner, + dme2Latitude, dme2Longitude, dme2AdditionalProps, + managed, useHttps, allowSelfSignedCerts); + dmaapTopicSource_s.add(uebTopicSource); } } @@ -291,25 +490,29 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { /** * {@inheritDoc} + * @throws IllegalArgumentException */ @Override public DmaapTopicSource build(List<String> servers, String topic, String apiKey, - String apiSecret) { + String apiSecret) throws IllegalArgumentException { return this.build(servers, topic, apiKey, apiSecret, null, null, null, null, DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH, DmaapTopicSource.DEFAULT_LIMIT_FETCH, - true); + true, + false, + false); } /** * {@inheritDoc} + * @throws IllegalArgumentException */ @Override - public DmaapTopicSource build(List<String> servers, String topic) { + public DmaapTopicSource build(List<String> servers, String topic) throws IllegalArgumentException { return this.build(servers, topic, null, null); } @@ -352,7 +555,7 @@ class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory { if (dmaapTopicSources.containsKey(topic)) { return dmaapTopicSources.get(topic); } else { - throw new IllegalArgumentException("DmaapTopicSource for " + topic + " not found"); + throw new IllegalArgumentException("DmaapTopiceSource for " + topic + " not found"); } } } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java index 85b98838..432f035c 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java @@ -54,7 +54,9 @@ public interface UebTopicSinkFactory { String apiKey, String apiSecret, String partitionKey, - boolean managed) + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException; /** @@ -135,9 +137,15 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { String apiKey, String apiSecret, String partitionKey, - boolean managed) + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("UEB Server(s) must be provided"); + } + if (topic == null || topic.isEmpty()) { throw new IllegalArgumentException("A topic must be provided"); } @@ -149,7 +157,7 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { UebTopicSink uebTopicWriter = new InlineUebTopicSink(servers, topic, - apiKey, apiSecret,partitionKey); + apiKey, apiSecret,partitionKey, useHttps, allowSelfSignedCerts); if (managed) uebTopicSinks.put(topic, uebTopicWriter); @@ -164,7 +172,8 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { */ @Override public UebTopicSink build(List<String> servers, String topic) throws IllegalArgumentException { - return this.build(servers, topic, null, null, null, true); + + return this.build(servers, topic, null, null, null, true, false, false); } @@ -212,9 +221,28 @@ class IndexedUebTopicSinkFactory implements UebTopicSinkFactory { managed = Boolean.parseBoolean(managedString); } + String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + //default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()){ + useHttps = Boolean.parseBoolean(useHttpsString); + } + + + String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + //default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){ + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + UebTopicSink uebTopicWriter = this.build(serverList, topic, apiKey, apiSecret, - partitionKey, managed); + partitionKey, managed, useHttps, allowSelfSignedCerts); uebTopicWriters.add(uebTopicWriter); } return uebTopicWriters; diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java index bf2a4038..1729576f 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java @@ -71,7 +71,9 @@ public interface UebTopicSourceFactory { String consumerInstance, int fetchTimeout, int fetchLimit, - boolean managed) + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException; /** @@ -162,8 +164,13 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { String consumerInstance, int fetchTimeout, int fetchLimit, - boolean managed) + boolean managed, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { + if (servers == null || servers.isEmpty()) { + throw new IllegalArgumentException("UEB Server(s) must be provided"); + } if (topic == null || topic.isEmpty()) { throw new IllegalArgumentException("A topic must be provided"); @@ -178,7 +185,7 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { new SingleThreadedUebTopicSource(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); if (managed) uebTopicSources.put(topic, uebTopicSource); @@ -263,10 +270,28 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { managed = Boolean.parseBoolean(managedString); } + String useHttpsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX); + + //default is to use HTTP if no https property exists + boolean useHttps = false; + if (useHttpsString != null && !useHttpsString.isEmpty()){ + useHttps = Boolean.parseBoolean(useHttpsString); + } + + String allowSelfSignedCertsString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX); + + //default is to disallow self-signed certs + boolean allowSelfSignedCerts = false; + if (allowSelfSignedCertsString != null && !allowSelfSignedCertsString.isEmpty()){ + allowSelfSignedCerts = Boolean.parseBoolean(allowSelfSignedCertsString); + } + UebTopicSource uebTopicSource = this.build(serverList, topic, apiKey, apiSecret, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, managed); + fetchTimeout, fetchLimit, managed, useHttps, allowSelfSignedCerts); uebTopicSources.add(uebTopicSource); } } @@ -281,11 +306,12 @@ class IndexedUebTopicSourceFactory implements UebTopicSourceFactory { String topic, String apiKey, String apiSecret) { + return this.build(servers, topic, apiKey, apiSecret, null, null, UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH, - UebTopicSource.DEFAULT_LIMIT_FETCH, true); + UebTopicSource.DEFAULT_LIMIT_FETCH, true, false, true); } /** diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java index 6fee5ce0..a34d361b 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -22,12 +22,20 @@ package org.openecomp.policy.drools.event.comm.bus.internal; import java.net.MalformedURLException; import java.security.GeneralSecurityException; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Properties; +import org.openecomp.policy.common.logging.eelf.PolicyLogger; +import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory; +import org.openecomp.policy.drools.properties.PolicyProperties; + import com.att.nsa.cambria.client.CambriaClientBuilders; import com.att.nsa.cambria.client.CambriaConsumer; +import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.client.impl.MRConsumerImpl; +import com.att.nsa.mr.client.response.MRConsumerResponse; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; @@ -76,17 +84,40 @@ public interface BusConsumer { public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit) + int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) throws IllegalArgumentException { ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder(); - builder.knownAs(consumerGroup, consumerInstance) + + if (useHttps){ + + if(useSelfSignedCerts){ + builder.knownAs(consumerGroup, consumerInstance) + .usingHosts(servers) + .onTopic(topic) + .waitAtServer(fetchTimeout) + .receivingAtMost(fetchLimit) + .usingHttps() + .allowSelfSignedCertificates(); + } + else{ + builder.knownAs(consumerGroup, consumerInstance) + .usingHosts(servers) + .onTopic(topic) + .waitAtServer(fetchTimeout) + .receivingAtMost(fetchLimit) + .usingHttps(); + } + } + else{ + builder.knownAs(consumerGroup, consumerInstance) .usingHosts(servers) .onTopic(topic) .waitAtServer(fetchTimeout) .receivingAtMost(fetchLimit); + } if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { @@ -123,8 +154,11 @@ public interface BusConsumer { /** * MR based consumer */ - public static class DmaapConsumerWrapper implements BusConsumer { + public abstract class DmaapConsumerWrapper implements BusConsumer { + protected int fetchTimeout; + protected Object closeCondition = new Object(); + /** * MR Consumer */ @@ -137,47 +171,75 @@ public interface BusConsumer { * @param topic topic * @param apiKey API Key * @param apiSecret API Secret - * @param aafLogin AAF Login - * @param aafPassword AAF Password + * @param username AAF Login + * @param password AAF Password * @param consumerGroup Consumer Group * @param consumerInstance Consumer Instance * @param fetchTimeout Fetch Timeout * @param fetchLimit Fetch Limit + * @throws MalformedURLException */ + @SuppressWarnings("unchecked") public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, - String aafLogin, String aafPassword, + String username, String password, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit) - throws Exception { + int fetchTimeout, int fetchLimit, boolean useHttps) + + throws MalformedURLException { + + this.fetchTimeout = fetchTimeout; + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("No topic for DMaaP"); + } this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, fetchTimeout, fetchLimit, null, apiKey, apiSecret); - this.consumer.setUsername(aafLogin); - this.consumer.setPassword(aafPassword); + this.consumer.setUsername(username); + this.consumer.setPassword(password); + - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - Properties props = new Properties(); - props.setProperty("Protocol", "http"); - this.consumer.setProps(props); - this.consumer.setHost(servers.get(0) + ":3904");; } /** * {@inheritDoc} */ public Iterable<String> fetch() throws Exception { - return this.consumer.fetch(); + MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); + + if (PolicyLogger.isDebugEnabled() && response != null) + PolicyLogger.debug(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received " + response.getResponseCode() + ": " + response.getResponseMessage()); + + if (response.getResponseCode() == null || !response.getResponseCode().equals("200")) { + if (response.getResponseCode() == null) + PolicyLogger.error(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received response code null"); + else + PolicyLogger.error(DmaapConsumerWrapper.class.getName(), "DMaaP consumer received " + response.getResponseCode() + ": " + response.getResponseMessage()); + + synchronized (closeCondition) { + closeCondition.wait(fetchTimeout); + } + } + + if (response.getActualMessages() == null) + return new ArrayList<String>(); + else + return response.getActualMessages(); } /** * {@inheritDoc} */ public void close() { + synchronized (closeCondition) { + closeCondition.notifyAll(); + } + this.consumer.close(); } @@ -196,7 +258,181 @@ public interface BusConsumer { } } + /** + * MR based consumer + */ + public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { + private Properties props; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param aafLogin AAF Login + * @param aafPassword AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws MalformedURLException + */ + public DmaapAafConsumerWrapper(List<String> servers, String topic, + String apiKey, String apiSecret, + String aafLogin, String aafPassword, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException { + + super(servers, topic, apiKey, apiSecret, + aafLogin, aafPassword, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, useHttps); + + // super constructor sets servers = {""} if empty to avoid errors when using DME2 + if ((servers.size() == 1 && servers.get(0).equals("")) || + (servers == null) || (servers.size() == 0)) { + throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); + } + + this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + + props = new Properties(); + + if(useHttps){ + props.setProperty("Protocol", "https"); + this.consumer.setHost(servers.get(0) + ":3905"); + + } + else{ + props.setProperty("Protocol", "http"); + this.consumer.setHost(servers.get(0) + ":3904"); + } + + this.consumer.setProps(props); + PolicyLogger.info(DmaapConsumerWrapper.class.getName(), "CREATION: " + this); + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + MRConsumerImpl consumer = (MRConsumerImpl) this.consumer; + + builder. + append("DmaapConsumerWrapper ["). + append("consumer.getAuthDate()=").append(consumer.getAuthDate()). + append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). + append(", consumer.getHost()=").append(consumer.getHost()). + append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). + append(", consumer.getUsername()=").append(consumer.getUsername()). + append("]"); + return builder.toString(); + } + } + public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { + private Properties props; + + public DmaapDmeConsumerWrapper(List<String> servers, String topic, + String apiKey, String apiSecret, + String dme2Login, String dme2Password, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit, + String environment, String aftEnvironment, String dme2Partner, + String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) throws MalformedURLException { + + + + super(servers, topic, apiKey, apiSecret, + dme2Login, dme2Password, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, useHttps); + + + String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + + if (environment == null || environment.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); + } if (aftEnvironment == null || aftEnvironment.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); + } if (latitude == null || latitude.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP"); + } if (longitude == null || longitude.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP"); + } + + if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + + String serviceName = servers.get(0); + + this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); + + this.consumer.setUsername(dme2Login); + this.consumer.setPassword(dme2Password); + + props = new Properties(); + + props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + + props.setProperty("username", dme2Login); + props.setProperty("password", dme2Password); + + /* These are required, no defaults */ + props.setProperty("topic", topic); + + props.setProperty("Environment", environment); + props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + + if (dme2Partner != null) + props.setProperty("Partner", dme2Partner); + if (dme2RouteOffer != null) + props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + + props.setProperty("Latitude", latitude); + props.setProperty("Longitude", longitude); + + /* These are optional, will default to these values if not set in additionalProps */ + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); + props.setProperty("Version", "1.0"); + props.setProperty("SubContextPath", "/"); + props.setProperty("sessionstickinessrequired", "no"); + + /* These should not change */ + props.setProperty("TransportType", "DME2"); + props.setProperty("MethodType", "GET"); + + if(useHttps){ + props.setProperty("Protocol", "https"); + + } + else{ + props.setProperty("Protocol", "http"); + } + + props.setProperty("contenttype", "application/json"); + + if (additionalProps != null) { + for(String key : additionalProps.keySet()) + props.put(key, additionalProps.get(key)); + } + + MRClientFactory.prop = props; + this.consumer.setProps(props); + + PolicyLogger.info(DmaapConsumerWrapper.class.getName(), "CREATION: " + this); + } + } } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java index 798bf989..b5595b2d 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java @@ -24,14 +24,20 @@ import java.net.MalformedURLException; import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.openecomp.policy.common.logging.eelf.PolicyLogger; +import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSinkFactory; +import org.openecomp.policy.drools.properties.PolicyProperties; +import org.slf4j.LoggerFactory; + import com.att.nsa.cambria.client.CambriaBatchingPublisher; import com.att.nsa.cambria.client.CambriaClientBuilders; import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; +import com.att.nsa.mr.client.response.MRPublisherResponse; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -65,15 +71,21 @@ public interface BusPublisher { public CambriaPublisherWrapper(List<String> servers, String topic, String apiKey, - String apiSecret) - throws IllegalArgumentException { + String apiSecret, boolean useHttps) throws IllegalArgumentException { PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder(); - - builder.usingHosts(servers) + + + if (useHttps){ + + builder.usingHosts(servers) + .onTopic(topic) + .usingHttps(); + } + else{ + builder.usingHosts(servers) .onTopic(topic); - - // Only supported in 0.2.4 version - // .logSendFailuresAfter(DEFAULT_LOG_SEND_FAILURES_AFTER); + } + if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { @@ -142,43 +154,105 @@ public interface BusPublisher { /** * DmaapClient library wrapper */ - public static class DmaapPublisherWrapper implements BusPublisher { + public abstract class DmaapPublisherWrapper implements BusPublisher { /** * MR based Publisher */ protected MRSimplerBatchPublisher publisher; + protected Properties props; - public DmaapPublisherWrapper(List<String> servers, String topic, - String aafLogin, - String aafPassword) { + /** + * MR Publisher Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param username AAF or DME2 Login + * @param password AAF or DME2 Password + */ + public DmaapPublisherWrapper(ProtocolTypeConstants protocol, + List<String> servers, String topic, + String username, + String password, boolean useHttps) throws IllegalArgumentException { + - ArrayList<String> dmaapServers = new ArrayList<String>(); - for (String server: servers) { - dmaapServers.add(server + ":3904"); + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("No topic for DMaaP"); } - - this.publisher = - new MRSimplerBatchPublisher.Builder(). - againstUrls(dmaapServers). - onTopic(topic). - build(); - this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + if (protocol == ProtocolTypeConstants.AAF_AUTH) { + if (servers == null || servers.isEmpty()) + throw new IllegalArgumentException("No DMaaP servers or DME2 partner provided"); + + ArrayList<String> dmaapServers = new ArrayList<String>(); + if(useHttps){ + for (String server: servers) { + dmaapServers.add(server + ":3905"); + } + + } + else{ + for (String server: servers) { + dmaapServers.add(server + ":3904"); + } + } + + + this.publisher = + new MRSimplerBatchPublisher.Builder(). + againstUrls(dmaapServers). + onTopic(topic). + build(); + + this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + } else if (protocol == ProtocolTypeConstants.DME2) { + ArrayList<String> dmaapServers = new ArrayList<String>(); + dmaapServers.add("0.0.0.0:3904"); + + this.publisher = + new MRSimplerBatchPublisher.Builder(). + againstUrls(dmaapServers). + onTopic(topic). + build(); + + this.publisher.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); + + } + + this.publisher.logTo(LoggerFactory.getLogger(MRSimplerBatchPublisher.class.getName())); - this.publisher.setUsername(aafLogin); - this.publisher.setPassword(aafPassword); + this.publisher.setUsername(username); + this.publisher.setPassword(password); - Properties props = new Properties(); - props.setProperty("Protocol", "http"); + props = new Properties(); + + if(useHttps){ + + props.setProperty("Protocol", "https"); + } + else{ + + props.setProperty("Protocol", "http"); + + } + props.setProperty("contenttype", "application/json"); + props.setProperty("username", username); + props.setProperty("password", password); + + props.setProperty("topic", topic); this.publisher.setProps(props); - this.publisher.setHost(servers.get(0)); + if (protocol == ProtocolTypeConstants.AAF_AUTH) + this.publisher.setHost(servers.get(0)); - if (PolicyLogger.isInfoEnabled()) + if (PolicyLogger.isInfoEnabled()) { PolicyLogger.info(DmaapPublisherWrapper.class.getName(), "CREATION: " + this); + PolicyLogger.info(DmaapPublisherWrapper.class.getName(), + "BusPublisher.DmaapPublisherWrapper using Protocol: " + protocol.getValue()); + } + } /** @@ -208,7 +282,16 @@ public interface BusPublisher { if (message == null) throw new IllegalArgumentException("No message provided"); + this.publisher.setPubResponse(new MRPublisherResponse()); this.publisher.send(partitionId, message); + MRPublisherResponse response = this.publisher.sendBatchWithResponse(); + if (PolicyLogger.isDebugEnabled() && response != null) { + PolicyLogger.debug(DmaapPublisherWrapper.class.getName(), + "DMaaP publisher received " + response.getResponseCode() + ": " + + response.getResponseMessage()); + + } + return true; } @@ -227,5 +310,97 @@ public interface BusPublisher { return builder.toString(); } } + + /** + * DmaapClient library wrapper + */ + public static class DmaapAafPublisherWrapper extends DmaapPublisherWrapper { + /** + * MR based Publisher + */ + protected MRSimplerBatchPublisher publisher; + + public DmaapAafPublisherWrapper(List<String> servers, String topic, + String aafLogin, + String aafPassword, boolean useHttps) { + + super(ProtocolTypeConstants.AAF_AUTH, servers, topic, aafLogin, aafPassword, useHttps); + } + } + + public static class DmaapDmePublisherWrapper extends DmaapPublisherWrapper { + public DmaapDmePublisherWrapper(List<String> servers, String topic, + String username, String password, + String environment, String aftEnvironment, String dme2Partner, + String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) { + + super(ProtocolTypeConstants.DME2, servers, topic, username, password, useHttps); + + + + + + + String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + + if (environment == null || environment.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); + } if (aftEnvironment == null || aftEnvironment.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); + } if (latitude == null || latitude.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP"); + } if (longitude == null || longitude.isEmpty()) { + throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP"); + } + + if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + + String serviceName = servers.get(0); + + /* These are required, no defaults */ + props.setProperty("Environment", environment); + props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + + props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + if (dme2Partner != null) + props.setProperty("Partner", dme2Partner); + if (dme2RouteOffer != null) + props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + + props.setProperty("Latitude", latitude); + props.setProperty("Longitude", longitude); + + // ServiceName also a default, found in additionalProps + + /* These are optional, will default to these values if not set in optionalProps */ + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); + props.setProperty("Version", "1.0"); + props.setProperty("SubContextPath", "/"); + props.setProperty("sessionstickinessrequired", "no"); + + /* These should not change */ + props.setProperty("TransportType", "DME2"); + props.setProperty("MethodType", "POST"); + + for (String key : additionalProps.keySet()) { + String value = additionalProps.get(key); + + if (value != null) + props.setProperty(key, value); + } + + this.publisher.setProps(props); + } + } } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java index e36e3afc..4ac1c6fc 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java @@ -23,7 +23,6 @@ package org.openecomp.policy.drools.event.comm.bus.internal; import java.util.List; import org.apache.commons.collections4.queue.CircularFifoQueue; - import org.openecomp.policy.drools.event.comm.Topic; import org.openecomp.policy.drools.event.comm.bus.BusTopic; @@ -35,13 +34,30 @@ public abstract class BusTopicBase implements BusTopic, Topic { protected String apiKey; protected String apiSecret; + protected boolean useHttps; + protected boolean allowSelfSignedCerts; protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<String>(10); + /** + * Instantiates a new Bus Topic Base + * + * @param servers list of servers + * @param topic topic name + * @param apiKey API Key + * @param apiSecret API Secret + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow + * + * @return a Bus Topic Base + * @throws IllegalArgumentException if invalid parameters are present + */ public BusTopicBase(List<String> servers, String topic, String apiKey, - String apiSecret) + String apiSecret, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { if (servers == null || servers.isEmpty()) { @@ -57,6 +73,8 @@ public abstract class BusTopicBase implements BusTopic, Topic { this.apiKey = apiKey; this.apiSecret = apiSecret; + this.useHttps = useHttps; + this.allowSelfSignedCerts = allowSelfSignedCerts; } /** @@ -92,6 +110,20 @@ public abstract class BusTopicBase implements BusTopic, Topic { } /** + * @return useHttps + */ + public boolean isUseHttps(){ + return useHttps; + } + + /** + * @return allowSelfSignedCerts + */ + public boolean isAllowSelfSignedCerts(){ + return allowSelfSignedCerts; + } + + /** * @return the recentEvents */ @Override @@ -104,8 +136,13 @@ public abstract class BusTopicBase implements BusTopic, Topic { @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("UebTopicBase [servers=").append(servers).append(", topic=").append(topic).append(", apiKey=") - .append(apiKey).append(", apiSecret=").append(apiSecret).append("]"); + builder.append("UebTopicBase [servers=").append(servers) + .append(", topic=").append(topic) + .append(", apiKey=").append(apiKey) + .append(", apiSecret=").append(apiSecret) + .append(", useHttps=").append(useHttps) + .append(", allowSelfSignedCerts=").append(allowSelfSignedCerts) + .append("]"); return builder.toString(); } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java index bd88818b..a78de716 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java @@ -83,13 +83,15 @@ public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopi * @param apiKey api secret * @param apiSecret api secret * @param partitionId partition id + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow * @throws IllegalArgumentException in invalid parameters are passed in */ public InlineBusTopicSink(List<String> servers, String topic, - String apiKey, String apiSecret, String partitionId) + String apiKey, String apiSecret, String partitionId, boolean useHttps, boolean allowSelfSignedCerts) throws IllegalArgumentException { - super(servers, topic, apiKey, apiSecret); + super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); if (partitionId == null || partitionId.isEmpty()) { this.partitionId = UUID.randomUUID ().toString(); diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java index 417c6d47..f5a3dc11 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java @@ -21,11 +21,12 @@ package org.openecomp.policy.drools.event.comm.bus.internal; import java.util.List; +import java.util.Map; -import org.openecomp.policy.drools.event.comm.Topic; -import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink; import org.openecomp.policy.common.logging.flexlogger.FlexLogger; import org.openecomp.policy.common.logging.flexlogger.Logger; +import org.openecomp.policy.drools.event.comm.Topic; +import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink; /** * This implementation publishes events for the associated DMAAP topic, @@ -39,13 +40,68 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop protected final String userName; protected final String password; + protected String environment = null; + protected String aftEnvironment = null; + protected String partner = null; + protected String latitude = null; + protected String longitude = null; + + protected Map<String,String> additionalProps = null; + + /** + * + * @param servers DMaaP servers + * @param topic DMaaP Topic to be monitored + * @param apiKey DMaaP API Key (optional) + * @param apiSecret DMaaP API Secret (optional) + * @param consumerGroup DMaaP Reader Consumer Group + * @param consumerInstance DMaaP Reader Instance + * @param fetchTimeout DMaaP fetch timeout + * @param fetchLimit DMaaP fetch limit + * @param environment DME2 Environment + * @param aftEnvironment DME2 AFT Environment + * @param partner DME2 Partner + * @param latitude DME2 Latitude + * @param longitude DME2 Longitude + * @param additionalProps Additional properties to pass to DME2 + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow + * + * @throws IllegalArgumentException An invalid parameter passed in + */ + public InlineDmaapTopicSink(List<String> servers, String topic, + String apiKey, String apiSecret, + String userName, String password, + String partitionKey, + String environment, String aftEnvironment, String partner, + String latitude, String longitude, Map<String,String> additionalProps, + boolean useHttps, boolean allowSelfSignedCerts) + throws IllegalArgumentException { + + super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); + + this.userName = userName; + this.password = password; + + this.environment = environment; + this.aftEnvironment = aftEnvironment; + this.partner = partner; + + this.latitude = latitude; + this.longitude = longitude; + + this.additionalProps = additionalProps; + + this.init(); + } + public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName, String password, - String partitionKey) + String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) throws IllegalArgumentException { - super(servers, topic, apiKey, apiSecret, partitionKey); + super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts); this.userName = userName; this.password = password; @@ -54,11 +110,24 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop @Override public void init() { - this.publisher = - new BusPublisher.DmaapPublisherWrapper(this.servers, + if ((this.environment == null || this.environment.isEmpty()) && + (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) && + (this.latitude == null || this.latitude.isEmpty()) && + (this.longitude == null || this.longitude.isEmpty()) && + (this.partner == null || this.partner.isEmpty())) { + this.publisher = + new BusPublisher.DmaapAafPublisherWrapper(this.servers, this.topic, this.userName, - this.password); + this.password, this.useHttps); + } else { + this.publisher = + new BusPublisher.DmaapDmePublisherWrapper(this.servers, this.topic, + this.userName, this.password, + this.environment, this.aftEnvironment, + this.partner, this.latitude, this.longitude, + this.additionalProps, this.useHttps); + } if (logger.isInfoEnabled()) logger.info("DMAAP SINK TOPIC created " + this); } @@ -81,4 +150,4 @@ public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTop return builder.toString(); } -} +}
\ No newline at end of file diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java index 2d4b1552..c93e0f2b 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java @@ -46,6 +46,8 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi * @param apiKey the api key (optional) * @param apiSecret the api secret (optional) * @param partitionId the partition key (optional, autogenerated if not provided) + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow * * @throws IllegalArgumentException if invalid arguments are detected */ @@ -53,9 +55,11 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi String topic, String apiKey, String apiSecret, - String partitionId) + String partitionId, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { - super(servers, topic, apiKey, apiSecret, partitionId); + super(servers, topic, apiKey, apiSecret, partitionId, useHttps, allowSelfSignedCerts); } /** @@ -68,7 +72,8 @@ public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSi new BusPublisher.CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, - this.apiSecret); + this.apiSecret, + this.useHttps); if (logger.isInfoEnabled()) logger.info("UEB SINK TOPIC created " + this); } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java index f37c349e..d3be9163 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java @@ -98,6 +98,7 @@ public abstract class SingleThreadedBusTopicSource */ protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>(); + /** * * @param servers Bus servers @@ -108,6 +109,8 @@ public abstract class SingleThreadedBusTopicSource * @param consumerInstance Bus Reader Instance * @param fetchTimeout Bus fetch timeout * @param fetchLimit Bus fetch limit + * @param useHttps does the bus use https + * @param allowSelfSignedCerts are self-signed certificates allowed * @throws IllegalArgumentException An invalid parameter passed in */ public SingleThreadedBusTopicSource(List<String> servers, @@ -117,10 +120,12 @@ public abstract class SingleThreadedBusTopicSource String consumerGroup, String consumerInstance, int fetchTimeout, - int fetchLimit) + int fetchLimit, + boolean useHttps, + boolean allowSelfSignedCerts) throws IllegalArgumentException { - super(servers, topic, apiKey, apiSecret); + super(servers, topic, apiKey, apiSecret, useHttps, allowSelfSignedCerts); if (consumerGroup == null || consumerGroup.isEmpty()) { this.consumerGroup = UUID.randomUUID ().toString(); @@ -145,6 +150,7 @@ public abstract class SingleThreadedBusTopicSource } else { this.fetchLimit = fetchLimit; } + } /** diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java index e65d44a7..2ced5bcb 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java @@ -21,10 +21,11 @@ package org.openecomp.policy.drools.event.comm.bus.internal; import java.util.List; +import java.util.Map; +import org.openecomp.policy.common.logging.eelf.PolicyLogger; import org.openecomp.policy.drools.event.comm.Topic; import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource; -import org.openecomp.policy.common.logging.eelf.PolicyLogger; /** * This topic reader implementation specializes in reading messages @@ -33,10 +34,21 @@ import org.openecomp.policy.common.logging.eelf.PolicyLogger; public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource implements DmaapTopicSource, Runnable { + + protected boolean allowSelfSignedCerts; protected final String userName; protected final String password; private String className = SingleThreadedDmaapTopicSource.class.getName(); + + protected String environment = null; + protected String aftEnvironment = null; + protected String partner = null; + protected String latitude = null; + protected String longitude = null; + + protected Map<String,String> additionalProps = null; + /** * * @param servers DMaaP servers @@ -47,19 +59,75 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource * @param consumerInstance DMaaP Reader Instance * @param fetchTimeout DMaaP fetch timeout * @param fetchLimit DMaaP fetch limit + * @param environment DME2 Environment + * @param aftEnvironment DME2 AFT Environment + * @param partner DME2 Partner + * @param latitude DME2 Latitude + * @param longitude DME2 Longitude + * @param additionalProps Additional properties to pass to DME2 + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow + * + * @throws IllegalArgumentException An invalid parameter passed in + */ + public SingleThreadedDmaapTopicSource(List<String> servers, String topic, + String apiKey, String apiSecret, + String userName, String password, + String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit, + String environment, String aftEnvironment, String partner, + String latitude, String longitude, Map<String,String> additionalProps, + boolean useHttps, boolean allowSelfSignedCerts) + throws IllegalArgumentException { + + super(servers, topic, apiKey, apiSecret, + consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, useHttps,allowSelfSignedCerts); + + this.userName = userName; + this.password = password; + + this.environment = environment; + this.aftEnvironment = aftEnvironment; + this.partner = partner; + + this.latitude = latitude; + this.longitude = longitude; + + this.additionalProps = additionalProps; + try { + this.init(); + } catch (Exception e) { + e.printStackTrace(); + throw new IllegalArgumentException(e); + } + } + + /** + * + * @param servers DMaaP servers + * @param topic DMaaP Topic to be monitored + * @param apiKey DMaaP API Key (optional) + * @param apiSecret DMaaP API Secret (optional) + * @param consumerGroup DMaaP Reader Consumer Group + * @param consumerInstance DMaaP Reader Instance + * @param fetchTimeout DMaaP fetch timeout + * @param fetchLimit DMaaP fetch limit + * @param useHttps does connection use HTTPS? + * @param allowSelfSignedCerts are self-signed certificates allow * @throws IllegalArgumentException An invalid parameter passed in */ public SingleThreadedDmaapTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, String userName, String password, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit) + int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) throws IllegalArgumentException { super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); this.userName = userName; this.password = password; @@ -78,22 +146,35 @@ public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource */ @Override public void init() throws Exception { - if (this.userName == null || this.userName.isEmpty() || - this.password == null || this.password.isEmpty()) { - this.consumer = - new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, - this.apiKey, this.apiSecret, - this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit); - } else { - this.consumer = - new BusConsumer.DmaapConsumerWrapper(this.servers, this.topic, - this.apiKey, this.apiSecret, - this.userName, this.password, - this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit); - } + this.password == null || this.password.isEmpty()) { + this.consumer = + new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, + this.apiKey, this.apiSecret, + this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit, + this.useHttps, this.allowSelfSignedCerts); + } else if ((this.environment == null || this.environment.isEmpty()) && + (this.aftEnvironment == null || this.aftEnvironment.isEmpty()) && + (this.latitude == null || this.latitude.isEmpty()) && + (this.longitude == null || this.longitude.isEmpty()) && + (this.partner == null || this.partner.isEmpty())) { + this.consumer = + new BusConsumer.DmaapAafConsumerWrapper(this.servers, this.topic, + this.apiKey, this.apiSecret, + this.userName, this.password, + this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit, this.useHttps); + } else { + this.consumer = + new BusConsumer.DmaapDmeConsumerWrapper(this.servers, this.topic, + this.apiKey, this.apiSecret, + this.userName, this.password, + this.consumerGroup, this.consumerInstance, + this.fetchTimeout, this.fetchLimit, + this.environment, this.aftEnvironment, this.partner, + this.latitude, this.longitude, this.additionalProps, this.useHttps); + } PolicyLogger.info(className, "CREATION: " + this); } diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java index edb55c75..641eddaa 100644 --- a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java +++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java @@ -42,18 +42,24 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource * @param consumerInstance UEB Reader Instance * @param fetchTimeout UEB fetch timeout * @param fetchLimit UEB fetch limit + * @param useHttps does topicSource use HTTPS? + * @param allowSelfSignedCerts does topicSource allow self-signed certs? + * * @throws IllegalArgumentException An invalid parameter passed in */ + + public SingleThreadedUebTopicSource(List<String> servers, String topic, String apiKey, String apiSecret, String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit) + int fetchTimeout, int fetchLimit, boolean useHttps, boolean allowSelfSignedCerts) throws IllegalArgumentException { super(servers, topic, apiKey, apiSecret, consumerGroup, consumerInstance, - fetchTimeout, fetchLimit); + fetchTimeout, fetchLimit, useHttps, allowSelfSignedCerts); + this.allowSelfSignedCerts = allowSelfSignedCerts; this.init(); } @@ -67,7 +73,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, this.apiKey, this.apiSecret, this.consumerGroup, this.consumerInstance, - this.fetchTimeout, this.fetchLimit); + this.fetchTimeout, this.fetchLimit, this.useHttps, this.allowSelfSignedCerts); } /** @@ -77,6 +83,7 @@ public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource public CommInfrastructure getTopicCommInfrastructure() { return Topic.CommInfrastructure.UEB; } + @Override public String toString() { |