diff options
36 files changed, 4247 insertions, 1409 deletions
diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 30e60521..00000000 --- a/.gitignore +++ /dev/null @@ -1,15 +0,0 @@ -.DS_Store -.project -.settings -.classpath -.jupiter -.pydevproject -*.swp -*.log -*.out -.metadata/ -target/ -*/logs/ -*/sql/ -*/testingLogs/ -*/config/ diff --git a/api-state-management/pom.xml b/api-state-management/pom.xml new file mode 100644 index 00000000..f5c1e21e --- /dev/null +++ b/api-state-management/pom.xml @@ -0,0 +1,62 @@ +<!-- + ============LICENSE_START======================================================= + ONAP Policy Engine - Drools PDP + ================================================================================ + Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.policy.drools-pdp</groupId> + <artifactId>drools-pdp</artifactId> + <version>1.1.0-SNAPSHOT</version> + </parent> + + <artifactId>api-state-management</artifactId> + + <name>api-state-management</name> + <description>Separately loadable module for state management APIe</description> + + <properties> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + <swagger.version>1.5.0</swagger.version> + </properties> + + <build> + <plugins> + <!-- none --> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.onap.policy.drools-pdp</groupId> + <artifactId>policy-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.onap.policy.common</groupId> + <artifactId>integrity-monitor</artifactId> + <version>${common-modules.version}</version> + </dependency> + </dependencies> +</project> diff --git a/api-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeatureAPI.java b/api-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeatureAPI.java new file mode 100644 index 00000000..a6d808ca --- /dev/null +++ b/api-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeatureAPI.java @@ -0,0 +1,182 @@ +/*- + * ============LICENSE_START======================================================= + * policy-core + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.statemanagement; + +import java.util.Observer; + +import org.onap.policy.common.im.StandbyStatusException; +import org.onap.policy.common.im.StateManagement; +import org.onap.policy.drools.properties.Lockable; +import org.onap.policy.drools.utils.OrderedService; +import org.onap.policy.drools.utils.OrderedServiceImpl; + +/** + * This interface provides a way to invoke optional features at various + * points in the code. At appropriate points in the + * application, the code iterates through this list, invoking these optional + * methods. Most of the methods here are notification only -- these tend to + * return a 'void' value. In other cases, such as 'activatePolicySession', + * may + */ +public interface StateManagementFeatureAPI extends OrderedService, Lockable +{ + + public static final String LOCKED = StateManagement.LOCKED; + public static final String UNLOCKED = StateManagement.UNLOCKED; + public static final String ENABLED = StateManagement.ENABLED; + public static final String DISABLED = StateManagement.DISABLED; + public static final String ENABLE_NOT_FAILED = StateManagement.ENABLE_NOT_FAILED; + public static final String DISABLE_FAILED = StateManagement.DISABLE_FAILED; + public static final String FAILED = StateManagement.FAILED; + public static final String DEPENDENCY = StateManagement.DEPENDENCY; + public static final String DEPENDENCY_FAILED = StateManagement.DEPENDENCY_FAILED; + public static final String DISABLE_DEPENDENCY = StateManagement.DISABLE_DEPENDENCY; + public static final String ENABLE_NO_DEPENDENCY = StateManagement.ENABLE_NO_DEPENDENCY; + public static final String NULL_VALUE = StateManagement.NULL_VALUE; + public static final String LOCK = StateManagement.LOCK; + public static final String UNLOCK = StateManagement.UNLOCK; + public static final String PROMOTE = StateManagement.PROMOTE; + public static final String DEMOTE = StateManagement.DEMOTE; + public static final String HOT_STANDBY = StateManagement.HOT_STANDBY; + public static final String COLD_STANDBY = StateManagement.COLD_STANDBY; + public static final String PROVIDING_SERVICE = StateManagement.PROVIDING_SERVICE; + + public static final String ADMIN_STATE = StateManagement.ADMIN_STATE; + public static final String OPERATION_STATE = StateManagement.OPERATION_STATE; + public static final String AVAILABLE_STATUS= StateManagement.AVAILABLE_STATUS; + public static final String STANDBY_STATUS = StateManagement.STANDBY_STATUS; + + public static final int SEQ_NUM = 0; + /** + * 'FeatureAPI.impl.getList()' returns an ordered list of objects + * implementing the 'FeatureAPI' interface. + */ + static public OrderedServiceImpl<StateManagementFeatureAPI> impl = + new OrderedServiceImpl<StateManagementFeatureAPI>(StateManagementFeatureAPI.class); + + /** + * This method is called to add an Observer to receive notifications of state changes + * + * @param stateChangeObserver + */ + public void addObserver(Observer stateChangeObserver); + + /** + * This method returns the X.731 Administrative State for this resource + * + * @return String (locked, unlocked) + */ + public String getAdminState(); + + /** + * This method returns the X.731 Operational State for this resource + * + * @return String (enabled, disabled) + */ + public String getOpState(); + + /** + * This method returns the X.731 Availability Status for this resource + * + * @return String (failed; dependency; dependency,failed) + */ + public String getAvailStatus(); + + /** + * This method returns the X.731 Standby Status for this resource + * + * @return String (providingservice, hotstandby or coldstandby) + */ + public String getStandbyStatus(); + + /** + * This method returns the X.731 Standby Status for the named resource + * @param String (resourceName) + * @return String (providingservice, hotstandby or coldstandby) + */ + public String getStandbyStatus(String resourceName); + + /** + * This method moves the X.731 Operational State for the named resource + * into a value of disabled and the Availability Status to a value of failed. + * As a consequence the Standby Status value will take a value of coldstandby. + * + * @param String (resourceName) + * @throws Exception + */ + public void disableFailed(String resourceName) throws Exception; + + /** + * This method moves the X.731 Operational State for this resource + * into a value of disabled and the Availability Status to a value of failed. + * As a consequence the Standby Status value will take a value of coldstandby. + * + * @param String (resourceName) + * @throws Exception + */ + public void disableFailed() throws Exception; + + /** + * This method moves the X.731 Standby Status for this resource from hotstandby + * to providingservice. If the current value is coldstandby, no change is made. + * If the current value is null, it will move to providingservice assuming the + * Operational State is enabled and Administrative State is unlocked. + * @throws Exception + * @throws StandbyStatusException + */ + public void promote() throws StandbyStatusException, Exception; + + /** + * This method moves the X.731 Standby Status for this resource from providingservice + * to hotstandby. If the current value is null, it will move to hotstandby assuming the + * Operational State is enabled and Administrative State is unlocked. Else, it will move + * to coldstandby + * @throws Exception + */ + public void demote() throws Exception; + + /** + * This method returns the resourceName associated with this instance of the StateManagementFeature + * @return String (resourceName) + */ + public String getResourceName(); + + /** + * This Lockable method will lock the StateManagement object Admin state + * @return true if successfull, false otherwise + */ + @Override + public boolean lock(); + + /** + * This Lockable method will unlock the StateManagement object Admin state + * @return true if successfull, false otherwise + */ + @Override + public boolean unlock(); + + /** + * This Lockable method indicates the Admin state StateManagement object + * @return true if locked, false otherwise + */ + @Override + public boolean isLocked(); +} diff --git a/feature-session-persistence/src/assembly/assemble_zip.xml b/feature-session-persistence/src/assembly/assemble_zip.xml index 1cc3ce5a..8a315960 100644 --- a/feature-session-persistence/src/assembly/assemble_zip.xml +++ b/feature-session-persistence/src/assembly/assemble_zip.xml @@ -24,7 +24,7 @@ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> - <id>session-persistence</id> + <id>feature-session-persistence</id> <formats> <format>zip</format> </formats> diff --git a/feature-session-persistence/src/main/feature/db/sessionpersistence/sql/18020-sessionpersistence.upgrade.sql b/feature-session-persistence/src/main/feature/db/sessionpersistence/sql/18020-sessionpersistence.upgrade.sql index 0e980968..5d4993fe 100644 --- a/feature-session-persistence/src/main/feature/db/sessionpersistence/sql/18020-sessionpersistence.upgrade.sql +++ b/feature-session-persistence/src/main/feature/db/sessionpersistence/sql/18020-sessionpersistence.upgrade.sql @@ -51,10 +51,4 @@ WORKITEMBYTEARRAY BLOB, PRIMARY KEY (WORKITEMID) ); -CREATE TABLE IF NOT EXISTS sessionpersistence.SESSIONINFO_ID_SEQ (next_val bigint) engine=MyISAM; -INSERT INTO sessionpersistence.SESSIONINFO_ID_SEQ (next_val) SELECT 1 WHERE NOT EXISTS (SELECT * FROM sessionpersistence.SESSIONINFO_ID_SEQ); - -CREATE TABLE IF NOT EXISTS sessionpersistence.WORKITEMINFO_ID_SEQ (next_val bigint) engine=MyISAM; -INSERT INTO sessionpersistence.WORKITEMINFO_ID_SEQ (next_val) SELECT 1 WHERE NOT EXISTS (SELECT * FROM sessionpersistence.WORKITEMINFO_ID_SEQ); - set foreign_key_checks=1;
\ No newline at end of file diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java index e6603b68..b48690b0 100644 --- a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java +++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java @@ -732,7 +732,7 @@ public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngine * before firing rules again. This is a "half" time, so that * we can multiply it by two without overflowing the word size. */ - long halfMaxSleepTime = 5000 / 2; + long halfMaxSleepTime = 5000L / 2L; /** * Constructor - initialize variables and create thread @@ -819,6 +819,7 @@ public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngine } catch (InterruptedException e) { logger.error("stopThread exception: ", e); + Thread.currentThread().interrupt(); } // verify that it's done @@ -866,11 +867,9 @@ public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngine // no rules fired -- increase poll delay sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime); } - - } catch (Throwable e) { - logger.error("startThread exception: ", e); - } - + } catch (Exception | LinkageError e) { + logger.error("Exception during kieSession.fireAllRules", e); + } try { if(stopped.await(sleepTime, TimeUnit.MILLISECONDS)) { @@ -879,6 +878,7 @@ public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngine } catch (InterruptedException e) { logger.error("startThread exception: ", e); + Thread.currentThread().interrupt(); break; } } diff --git a/feature-state-management/pom.xml b/feature-state-management/pom.xml new file mode 100644 index 00000000..5265cdbb --- /dev/null +++ b/feature-state-management/pom.xml @@ -0,0 +1,137 @@ +<!-- + ============LICENSE_START======================================================= + feature-state-management + ================================================================================ + Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.policy.drools-pdp</groupId> + <artifactId>drools-pdp</artifactId> + <version>1.1.0-SNAPSHOT</version> + </parent> + + <artifactId>feature-state-management</artifactId> + + <name>feature-state-management</name> + <description>Separately loadable module for State Management</description> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <id>zipfile</id> + <goals> + <goal>single</goal> + </goals> + <phase>package</phase> + <configuration> + <attach>true</attach> + <finalName>${project.artifactId}-${project.version}</finalName> + <descriptors> + <descriptor>src/assembly/assemble_zip.xml</descriptor> + </descriptors> + <appendAssemblyId>false</appendAssemblyId> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <goals> + <goal>copy-dependencies</goal> + </goals> + <phase>prepare-package</phase> + <configuration> + <transitive>false</transitive> + <outputDirectory>${project.build.directory}/assembly/lib</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>true</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <useRepositoryLayout>false</useRepositoryLayout> + <addParentPoms>false</addParentPoms> + <copyPom>false</copyPom> + <includeScope>runtime</includeScope> + <excludeTransitive>true</excludeTransitive> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>io.swagger</groupId> + <artifactId>swagger-jersey2-jaxrs</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.onap.policy.drools-pdp</groupId> + <artifactId>policy-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.onap.policy.drools-pdp</groupId> + <artifactId>policy-management</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.onap.policy.drools-pdp</groupId> + <artifactId>api-state-management</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <version>[1.4.186,)</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.eclipse.persistence</groupId> + <artifactId>eclipselink</artifactId> + <scope>provided</scope> + </dependency> + <!-- Need to pull in to assembly --> + <dependency> + <groupId>org.onap.policy.common</groupId> + <artifactId>integrity-monitor</artifactId> + <version>${common-modules.version}</version> + </dependency> + <!-- Need to pull into assembly for IntegrityMonitor --> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + </dependency> + </dependencies> +</project> diff --git a/feature-state-management/src/assembly/assemble_zip.xml b/feature-state-management/src/assembly/assemble_zip.xml new file mode 100644 index 00000000..f398829d --- /dev/null +++ b/feature-state-management/src/assembly/assemble_zip.xml @@ -0,0 +1,76 @@ +<!-- + ============LICENSE_START======================================================= + feature-state-management + ================================================================================ + Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + +<!-- Defines how we build the .zip file which is our distribution. --> + +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>feature-state-management</id> + <formats> + <format>zip</format> + </formats> + + <!-- we want "system" and related files right at the root level as this + file is suppose to be unzip on top of a karaf distro. --> + <includeBaseDirectory>false</includeBaseDirectory> + + <fileSets> + <fileSet> + <directory>target</directory> + <outputDirectory>lib/feature</outputDirectory> + <includes> + <include>feature-state-management-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>target/assembly/lib</directory> + <outputDirectory>lib/dependencies</outputDirectory> + <includes> + <include>*.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>src/main/feature/config</directory> + <outputDirectory>config</outputDirectory> + <fileMode>0644</fileMode> + <excludes/> + </fileSet> + <fileSet> + <directory>src/main/feature/bin</directory> + <outputDirectory>bin</outputDirectory> + <fileMode>0744</fileMode> + <excludes/> + </fileSet> + <fileSet> + <directory>src/main/feature/db</directory> + <outputDirectory>db</outputDirectory> + <fileMode>0744</fileMode> + <excludes/> + </fileSet> + <fileSet> + <directory>src/main/feature/install</directory> + <outputDirectory>install</outputDirectory> + <fileMode>0744</fileMode> + <excludes/> + </fileSet> + </fileSets> +</assembly> diff --git a/feature-state-management/src/main/feature/config/feature-state-management.properties b/feature-state-management/src/main/feature/config/feature-state-management.properties new file mode 100644 index 00000000..72c1fe22 --- /dev/null +++ b/feature-state-management/src/main/feature/config/feature-state-management.properties @@ -0,0 +1,82 @@ +### +# ============LICENSE_START======================================================= +# feature-state-management +# ================================================================================ +# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +# DB properties +javax.persistence.jdbc.driver=org.mariadb.jdbc.Driver +javax.persistence.jdbc.url=jdbc:mariadb://${{SQL_HOST}}:3306/statemanagement +javax.persistence.jdbc.user=${{SQL_USER}} +javax.persistence.jdbc.password=${{SQL_PASSWORD}} + +# DroolsPDPIntegrityMonitor Properties +hostPort=0.0.0.0:57692 + +#IntegrityMonitor Properties + +# Must be unique across the system +resource.name=pdp1 +# Name of the site in which this node is hosted +site_name=site1 +# Forward Progress Monitor update interval seconds +fp_monitor_interval=30 +# Failed counter threshold before failover +failed_counter_threshold=3 +# Interval between test transactions when no traffic seconds +test_trans_interval=10 +# Interval between writes of the FPC to the DB seconds +write_fpc_interval=5 +# Node type Note: Make sure you don't leave any trailing spaces, or you'll get an 'invalid node type' error! +node_type=pdp_drools +# Dependency groups are groups of resources upon which a node operational state is dependent upon. +# Each group is a comma-separated list of resource names and groups are separated by a semicolon. For example: +# dependency_groups=site_1.astra_1,site_1.astra_2;site_1.brms_1,site_1.brms_2;site_1.logparser_1;site_1.pypdp_1 +dependency_groups= +# When set to true, dependent health checks are performed by using JMX to invoke test() on the dependent. +# The default false is to use state checks for health. +test_via_jmx=true +# This is the max number of seconds beyond which a non incrementing FPC is considered a failure +max_fpc_update_interval=120 +# Run the state audit every 60 seconds (60000 ms). The state audit finds stale DB entries in the +# forwardprogressentity table and marks the node as disabled/failed in the statemanagemententity +# table. NOTE! It will only run on nodes that have a standbystatus = providingservice. +# A value of <= 0 will turn off the state audit. +state_audit_interval_ms=60000 +# The refresh state audit is run every (default) 10 minutes (600000 ms) to clean up any state corruption in the +# DB statemanagemententity table. It only refreshes the DB state entry for the local node. That is, it does not +# refresh the state of any other nodes. A value <= 0 will turn the audit off. Any other value will override +# the default of 600000 ms. +refresh_state_audit_interval_ms=600000 + + +# Repository audit properties + +# Assume it's the releaseRepository that needs to be audited, +# because that's the one BRMGW will publish to. +repository.audit.id=${{releaseRepositoryID}} +repository.audit.url=${{releaseRepositoryUrl}} +repository.audit.username=${{repositoryUsername}} +repository.audit.password=${{repositoryPassword}} +# Flag to control the execution of the subsystemTest for the Nexus Maven repository +repository.audit.is.active=false +repository.audit.ignore.errors=true + +# DB Audit Properties + +# Flag to control the execution of the subsystemTest for the Database +db.audit.is.active=false
\ No newline at end of file diff --git a/feature-state-management/src/main/feature/db/statemanagement/sql/18020-statemanagement.upgrade.sql b/feature-state-management/src/main/feature/db/statemanagement/sql/18020-statemanagement.upgrade.sql new file mode 100644 index 00000000..f73f992b --- /dev/null +++ b/feature-state-management/src/main/feature/db/statemanagement/sql/18020-statemanagement.upgrade.sql @@ -0,0 +1,74 @@ +/*- + * ============LICENSE_START======================================================= + * feature-state-management + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +set foreign_key_checks=0; + +CREATE TABLE if not exists statemanagement.StateManagementEntity +( + id int not null auto_increment, + resourceName varchar(100) not null, + adminState varchar(20) not null, + opstate varchar(20) not null, + availStatus varchar(20), + standbyStatus varchar(20), + created_date timestamp not null default current_timestamp, + modifiedDate timestamp not null, + primary key(id), + unique key resource(resourceName) +); + +CREATE TABLE if not exists statemanagement.ResourceRegistrationEntity +( + resourceRegistrationId bigint not null auto_increment, + resourceName varchar(100) not null, + resourceURL varchar(255) not null, + site varchar(50), + nodetype varchar(50), + created_date timestamp not null default current_timestamp, + last_updated timestamp not null, + primary key (resourceRegistrationId), + unique key resource (resourceName), + unique key id_resource_url (resourceURL) +); + +CREATE TABLE if not exists statemanagement.ForwardProgressEntity +( + forwardProgressId bigint not null auto_increment, + resourceName varchar(100) not null, + fpc_count bigint not null, + created_date timestamp not null default current_timestamp, + last_updated timestamp not null, + primary key (forwardProgressId), + unique key resource_key (resourceName) +); + +CREATE TABLE if not exists statemanagement.sequence +( +SEQ_NAME VARCHAR(50) NOT NULL, +SEQ_COUNT DECIMAL(38,0), +PRIMARY KEY (SEQ_NAME) +); + +-- Will only insert a record if none exists: +INSERT INTO statemanagement.SEQUENCE (SEQ_NAME,SEQ_COUNT) +SELECT * FROM (SELECT 'SEQ_GEN',1) AS tmp +WHERE NOT EXISTS(select SEQ_NAME from statemanagement.SEQUENCE where SEQ_NAME = 'SEQ_GEN') LIMIT 1; + +set foreign_key_checks=1;
\ No newline at end of file diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DbAudit.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DbAudit.java new file mode 100644 index 00000000..a86ac8ef --- /dev/null +++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DbAudit.java @@ -0,0 +1,218 @@ +/*- + * ============LICENSE_START======================================================= + * feature-state-management + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.statemanagement; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Properties; +import java.util.UUID; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class audits the database + */ +public class DbAudit extends DroolsPDPIntegrityMonitor.AuditBase +{ + // get an instance of logger + private static Logger logger = LoggerFactory.getLogger(DbAudit.class); + // single global instance of this audit object + final static private DbAudit instance = new DbAudit(); + + // This indicates if 'CREATE TABLE IF NOT EXISTS Audit ...' should be + // invoked -- doing this avoids the need to create the table in advance. + static private boolean createTableNeeded = true; + + /** + * @return the single 'DbAudit' instance + */ + static DroolsPDPIntegrityMonitor.AuditBase getInstance() + { + return(instance); + } + + /** + * Constructor - set the name to 'Database' + */ + private DbAudit() + { + super("Database"); + } + + /** + * Invoke the audit + * + * @param properties properties to be passed to the audit + */ + @Override + public void invoke(Properties properties) + { + if(logger.isDebugEnabled()){ + logger.debug("Running 'DbAudit.invoke'"); + } + boolean isActive = true; + String dbAuditIsActive = StateManagementProperties.getProperty("db.audit.is.active"); + if(logger.isDebugEnabled()){ + logger.debug("DbAudit.invoke: dbAuditIsActive = {}", dbAuditIsActive); + } + + if (dbAuditIsActive != null) { + try { + isActive = Boolean.parseBoolean(dbAuditIsActive.trim()); + } catch (NumberFormatException e) { + logger.warn("DbAudit.invoke: Ignoring invalid property: db.audit.is.active = {}", dbAuditIsActive); + } + } + + if(!isActive){ + + logger.info("DbAudit.invoke: exiting because isActive = {}", isActive); + return; + } + + // fetch DB properties from properties file -- they are already known + // to exist, because they were verified by the 'IntegrityMonitor' + // constructor + String url = properties.getProperty(StateManagementProperties.DB_URL); + String user = properties.getProperty(StateManagementProperties.DB_USER); + String password = properties.getProperty(StateManagementProperties.DB_PWD); + + // connection to DB + Connection connection = null; + + // supports SQL operations + PreparedStatement statement = null; + ResultSet rs = null; + + // operation phase currently running -- used to construct an error + // message, if needed + String phase = null; + + try + { + // create connection to DB + phase = "creating connection"; + if(logger.isDebugEnabled()){ + logger.debug("DbAudit: Creating connection to {}", url); + } + + connection = DriverManager.getConnection(url, user, password); + + // create audit table, if needed + if (createTableNeeded) + { + phase = "create table"; + if(logger.isDebugEnabled()){ + logger.info("DbAudit: Creating 'Audit' table, if needed"); + } + statement = connection.prepareStatement + ("CREATE TABLE IF NOT EXISTS Audit (\n" + + " name varchar(64) DEFAULT NULL,\n" + + " UNIQUE KEY name (name)\n" + + ") DEFAULT CHARSET=latin1;"); + statement.execute(); + statement.close(); + createTableNeeded = false; + } + + // insert an entry into the table + phase = "insert entry"; + String key = UUID.randomUUID().toString(); + statement = connection.prepareStatement + ("INSERT INTO Audit (name) VALUES (?)"); + statement.setString(1, key); + statement.executeUpdate(); + statement.close(); + + // fetch the entry from the table + phase = "fetch entry"; + statement = connection.prepareStatement + ("SELECT name FROM Audit WHERE name = ?"); + statement.setString(1, key); + rs = statement.executeQuery(); + if (rs.first()) + { + // found entry + if(logger.isDebugEnabled()){ + logger.debug("DbAudit: Found key {}", rs.getString(1)); + } + } + else + { + logger.error + ("DbAudit: can't find newly-created entry with key {}", key); + setResponse("Can't find newly-created entry"); + } + statement.close(); + + // delete entries from table + phase = "delete entry"; + statement = connection.prepareStatement + ("DELETE FROM Audit WHERE name = ?"); + statement.setString(1, key); + statement.executeUpdate(); + statement.close(); + statement = null; + } + catch (Exception e) + { + String message = "DbAudit: Exception during audit, phase = " + phase; + logger.error(message, e); + setResponse(message); + } + finally + { + if (rs != null) + { + try + { + rs.close(); + } + catch (Exception e) + { + } + } + if (statement != null) + { + try + { + statement.close(); + } + catch (Exception e) + { + } + } + if (connection != null) + { + try + { + connection.close(); + } + catch (Exception e) + { + } + } + } + } +} diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DroolsPDPIntegrityMonitor.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DroolsPDPIntegrityMonitor.java new file mode 100644 index 00000000..73f6f738 --- /dev/null +++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DroolsPDPIntegrityMonitor.java @@ -0,0 +1,398 @@ +/*- + * ============LICENSE_START======================================================= + * feature-state-management + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.statemanagement; + +import java.io.File; +import java.io.FileInputStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Properties; + +import org.onap.policy.common.im.IntegrityMonitor; +import org.onap.policy.common.im.IntegrityMonitorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.onap.policy.drools.core.PolicyContainer; +import org.onap.policy.drools.http.server.HttpServletServer; +import org.onap.policy.drools.properties.Startable; +import org.onap.policy.drools.utils.PropertyUtil; + +/** + * This class extends 'IntegrityMonitor' for use in the 'Drools PDP' + * virtual machine. The included audits are 'Database' and 'Repository'. + */ +public class DroolsPDPIntegrityMonitor extends IntegrityMonitor +{ + + // get an instance of logger + private static final Logger logger = LoggerFactory.getLogger(DroolsPDPIntegrityMonitor.class); + + // static global instance + static private DroolsPDPIntegrityMonitor im = null; + + // list of audits to run + static private AuditBase[] audits = + new AuditBase[]{DbAudit.getInstance(), RepositoryAudit.getInstance()}; + + static private Properties subsystemTestProperties = null; + + static private final String PROPERTIES_NAME = "feature-state-management.properties"; + /** + * Static initialization -- create Drools Integrity Monitor, and + * an HTTP server to handle REST 'test' requests + */ + static public DroolsPDPIntegrityMonitor init(String configDir) throws Exception + { + + logger.info("init: Entering and invoking PropertyUtil.getProperties() on '{}'", configDir); + + // read in properties + Properties stateManagementProperties = + PropertyUtil.getProperties(configDir + "/" + PROPERTIES_NAME); + + subsystemTestProperties = stateManagementProperties; + + // fetch and verify definitions of some properties + // (the 'IntegrityMonitor' constructor does some additional verification) + + String resourceName = stateManagementProperties.getProperty("resource.name"); + String hostPort = stateManagementProperties.getProperty("hostPort"); + String fpMonitorInterval = stateManagementProperties.getProperty("fp_monitor_interval"); + String failedCounterThreshold = stateManagementProperties.getProperty("failed_counter_threshold"); + String testTransInterval = stateManagementProperties.getProperty("test_trans_interval"); + String writeFpcInterval = stateManagementProperties.getProperty("write_fpc_interval"); + String siteName = stateManagementProperties.getProperty("site_name"); + String nodeType = stateManagementProperties.getProperty("node_type"); + String dependencyGroups = stateManagementProperties.getProperty("dependency_groups"); + String javaxPersistenceJdbcDriver = stateManagementProperties.getProperty("javax.persistence.jdbc.driver"); + String javaxPersistenceJdbcUrl = stateManagementProperties.getProperty("javax.persistence.jdbc.url"); + String javaxPersistenceJdbcUser = stateManagementProperties.getProperty("javax.persistence.jdbc.user"); + String javaxPersistenceJdbcPassword = stateManagementProperties.getProperty("javax.persistence.jdbc.password"); + + if (resourceName == null) + { + logger.error("init: Missing IntegrityMonitor property: 'resource.name'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'resource.name'")); + } + if (hostPort == null) + { + logger.error("init: Missing IntegrityMonitor property: 'hostPort'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'hostPort'")); + } + if (fpMonitorInterval == null) + { + logger.error("init: Missing IntegrityMonitor property: 'fp_monitor_interval'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'fp_monitor_interval'")); + } + if (failedCounterThreshold == null) + { + logger.error("init: Missing IntegrityMonitor property: 'failed_counter_threshold'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'failed_counter_threshold'")); + } + if (testTransInterval == null) + { + logger.error("init: Missing IntegrityMonitor property: 'test_trans_interval'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'test_trans_interval'")); + } + if (writeFpcInterval == null) + { + logger.error("init: Missing IntegrityMonitor property: 'write_fpc_interval'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'write_fpc_interval'")); + } + if (siteName == null) + { + logger.error("init: Missing IntegrityMonitor property: 'site_name'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'site_name'")); + } + if (nodeType == null) + { + logger.error("init: Missing IntegrityMonitor property: 'node_type'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'node_type'")); + } + if (dependencyGroups == null) + { + logger.error("init: Missing IntegrityMonitor property: 'dependency_groups'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'dependency_groups'")); + } + if (javaxPersistenceJdbcDriver == null) + { + logger.error("init: Missing IntegrityMonitor property: 'javax.persistence.jbdc.driver for xacml DB'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'javax.persistence.jbdc.driver for xacml DB'")); + } + if (javaxPersistenceJdbcUrl == null) + { + logger.error("init: Missing IntegrityMonitor property: 'javax.persistence.jbdc.url for xacml DB'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'javax.persistence.jbdc.url for xacml DB'")); + } + if (javaxPersistenceJdbcUser == null) + { + logger.error("init: Missing IntegrityMonitor property: 'javax.persistence.jbdc.user for xacml DB'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'javax.persistence.jbdc.user for xacml DB'")); + } + if (javaxPersistenceJdbcPassword == null) + { + logger.error("init: Missing IntegrityMonitor property: 'javax.persistence.jbdc.password for xacml DB'"); + throw(new Exception + ("Missing IntegrityMonitor property: 'javax.persistence.jbdc.password' for xacml DB'")); + } + + // Now that we've validated the properties, create Drools Integrity Monitor + // with these properties. + im = new DroolsPDPIntegrityMonitor(resourceName, + stateManagementProperties); + logger.info("init: New DroolsPDPIntegrityMonitor instantiated, hostPort= {}", hostPort); + + // determine host and port for HTTP server + int index = hostPort.lastIndexOf(':'); + InetSocketAddress addr; + + if (index < 0) + { + addr = new InetSocketAddress(Integer.valueOf(hostPort)); + } + else + { + addr = new InetSocketAddress + (hostPort.substring(0, index), + Integer.valueOf(hostPort.substring(index + 1))); + } + + // create http server + try { + logger.info("init: Starting HTTP server, addr= {}", addr); + IntegrityMonitorRestServer server = new IntegrityMonitorRestServer(); + + server.init(stateManagementProperties); + + System.out.println("init: Started server on hostPort=" + hostPort); + } catch (Exception e) { + logger.error("init: Caught Exception attempting to start server on hostPort= {}, message = {}", + hostPort, e.getMessage()); + throw e; + + } + + logger.info("init: Exiting and returning DroolsPDPIntegrityMonitor"); + return im; + } + + /** + * Constructor - pass arguments to superclass, but remember properties + * @param resourceName unique name of this Integrity Monitor + * @param url the JMX URL of the MBean server + * @param properties properties used locally, as well as by + * 'IntegrityMonitor' + * @throws Exception (passed from superclass) + */ + private DroolsPDPIntegrityMonitor(String resourceName, + Properties consolidatedProperties + ) throws Exception { + super(resourceName, consolidatedProperties); + } + + /** + * Run tests (audits) unique to Drools PDP VM (Database + Repository) + */ + @Override + public void subsystemTest() throws IntegrityMonitorException + { + logger.info("DroolsPDPIntegrityMonitor.subsystemTest called"); + + // clear all responses (non-null values indicate an error) + for (AuditBase audit : audits) + { + audit.setResponse(null); + } + + // invoke all of the audits + for (AuditBase audit : audits) + { + try + { + // invoke the audit (responses are stored within the audit object) + audit.invoke(subsystemTestProperties); + } + catch (Exception e) + { + logger.error("{} audit error", audit.getName(), e); + if (audit.getResponse() == null) + { + // if there is no current response, use the exception message + audit.setResponse(e.getMessage()); + } + } + } + + // will contain list of subsystems where the audit failed + String responseMsg = ""; + + // Loop through all of the audits, and see which ones have failed. + // NOTE: response information is stored within the audit objects + // themselves -- only one can run at a time. + for (AuditBase audit : audits) + { + String response = audit.getResponse(); + if (response != null) + { + // the audit has failed -- add subsystem and + // and 'responseValue' with the new information + responseMsg = responseMsg.concat("\n" + audit.getName() + ": " + response); + } + } + + if(!responseMsg.isEmpty()){ + throw new IntegrityMonitorException(responseMsg); + } + } + + /* ============================================================ */ + + /** + * This is the base class for audits invoked in 'subsystemTest' + */ + static public abstract class AuditBase + { + // name of the audit + protected String name; + + // non-null indicates the error response + protected String response; + + /** + * Constructor - initialize the name, and clear the initial response + * @param name name of the audit + */ + public AuditBase(String name) + { + this.name = name; + this.response = null; + } + + /** + * @return the name of this audit + */ + public String getName() + { + return(name); + } + + /** + * @return the response String (non-null indicates the error message) + */ + public String getResponse() + { + return(response); + } + + /** + * Set the response string to the specified value + * @param value the new value of the response string (null = no errors) + */ + public void setResponse(String value) + { + response = value; + } + + /** + * Abstract method to invoke the audit + * @param persistenceProperties Used for DB access + * @throws Exception passed in by the audit + */ + abstract void invoke(Properties persistenceProperties) throws Exception; + } + + public static class IntegrityMonitorRestServer implements Startable { + protected volatile HttpServletServer server = null; + protected volatile Properties integrityMonitorRestServerProperties = null; + + public void init(Properties props) { + this.integrityMonitorRestServerProperties = props; + this.start(); + } + + @Override + public boolean start() throws IllegalStateException { + try { + ArrayList<HttpServletServer> servers = HttpServletServer.factory.build(integrityMonitorRestServerProperties); + + if (!servers.isEmpty()) { + server = servers.get(0); + + try { + server.waitedStart(5); + } catch (Exception e) { + e.printStackTrace(); + } + } + } catch (Exception e) { + return false; + } + + return true; + } + + @Override + public boolean stop() throws IllegalStateException { + try { + server.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + + return true; + } + + @Override + public void shutdown() throws IllegalStateException { + this.stop(); + } + + @Override + public synchronized boolean isAlive() { + return this.integrityMonitorRestServerProperties != null; + } + } + + public static DroolsPDPIntegrityMonitor getInstance() throws Exception{ + if(logger.isDebugEnabled()){ + logger.debug("getInstance() called"); + } + if (im == null) { + String msg = "No DroolsPDPIntegrityMonitor instance exists." + + " Please use the method DroolsPDPIntegrityMonitor init(String configDir)"; + throw new Exception(msg); + }else{ + return im; + } + } +} diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/IntegrityMonitorRestManager.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/IntegrityMonitorRestManager.java new file mode 100644 index 00000000..f5024299 --- /dev/null +++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/IntegrityMonitorRestManager.java @@ -0,0 +1,110 @@ +/*- + * ============LICENSE_START======================================================= + * feature-state-management + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.statemanagement; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.core.Response; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; + +@Api(value = "test") + @Path("/") +public class IntegrityMonitorRestManager { + private static Logger logger = LoggerFactory.getLogger(IntegrityMonitorRestManager.class); + private DroolsPDPIntegrityMonitor im; + + /** + * Test interface for Integrity Monitor + * + * @return Exception message if exception, otherwise empty + */ + @ApiOperation( + value = "Test endpoint for integrity monitor", + notes = "The TEST command is used to request data from a subcomponent " + + "instance that can be used to determine its operational state. " + + "A 200/success response status code should be returned if the " + + "subcomponent instance is functioning properly and able to respond to requests.", + response = String.class) + @ApiResponses(value = { + @ApiResponse( + code = 200, + message = "Integrity monitor sanity check passed"), + @ApiResponse( + code = 500, + message = "Integrity monitor sanity check encountered an exception. This can indicate operational state disabled or administrative state locked") + }) + @GET + @Path("test") + public Response test() { + logger.error("integrity monitor /test accessed"); + // The responses are stored within the audit objects, so we need to + // invoke the audits and get responses before we handle another + // request. + synchronized (IntegrityMonitorRestManager.class) { + // will include messages associated with subsystem failures + StringBuilder body = new StringBuilder(); + + // 200=SUCCESS, 500=failure + int responseValue = 200; + + if (im == null) { + try { + im = DroolsPDPIntegrityMonitor.getInstance(); + } catch (Exception e) { + logger.error("IntegrityMonitorRestManager: test() interface caught an exception", e); + e.printStackTrace(); + + body.append("\nException: " + e + "\n"); + responseValue = 500; + } + } + + if (im != null) { + try { + // call 'IntegrityMonitor.evaluateSanity()' + im.evaluateSanity(); + } catch (Exception e) { + // this exception isn't coming from one of the audits, + // because those are caught in 'subsystemTest()' + logger.error("DroolsPDPIntegrityMonitor.evaluateSanity()", e); + + // include exception in HTTP response + body.append("\nException: " + e + "\n"); + responseValue = 500; + } + } + + // send response, including the contents of 'body' + // (which is empty if everything is successful) + if (responseValue == 200) + return Response.status(Response.Status.OK).build(); + else + return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(body.toString()).build(); + } + } +} diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java new file mode 100644 index 00000000..6171572a --- /dev/null +++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java @@ -0,0 +1,552 @@ +/*- + * ============LICENSE_START======================================================= + * feature-state-management + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.statemanagement; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.LinkedList; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class audits the Maven repository + */ +public class RepositoryAudit extends DroolsPDPIntegrityMonitor.AuditBase +{ + private static final long DEFAULT_TIMEOUT = 60; // timeout in 60 seconds + + // get an instance of logger + private static Logger logger = LoggerFactory.getLogger(RepositoryAudit.class); + // single global instance of this audit object + static private RepositoryAudit instance = new RepositoryAudit(); + + /** + * @return the single 'RepositoryAudit' instance + */ + static DroolsPDPIntegrityMonitor.AuditBase getInstance() + { + return(instance); + } + + /** + * Constructor - set the name to 'Repository' + */ + private RepositoryAudit() + { + super("Repository"); + } + + /** + * Invoke the audit + * + * @param properties properties to be passed to the audit + */ + @Override + public void invoke(Properties properties) + throws IOException, InterruptedException + { + if(logger.isDebugEnabled()){ + logger.debug("Running 'RepositoryAudit.invoke'"); + } + + boolean isActive = true; + boolean ignoreErrors = true; // ignore errors by default + String repoAuditIsActive = StateManagementProperties.getProperty("repository.audit.is.active"); + String repoAuditIgnoreErrors = + StateManagementProperties.getProperty("repository.audit.ignore.errors"); + logger.debug("RepositoryAudit.invoke: repoAuditIsActive = {}" + + ", repoAuditIgnoreErrors = {}",repoAuditIsActive, repoAuditIgnoreErrors); + + if (repoAuditIsActive != null) { + try { + isActive = Boolean.parseBoolean(repoAuditIsActive.trim()); + } catch (NumberFormatException e) { + logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.is.active = {}", repoAuditIsActive); + } + } + + if(!isActive){ + logger.info("RepositoryAudit.invoke: exiting because isActive = {}", isActive); + return; + } + + if (repoAuditIgnoreErrors != null) + { + try + { + ignoreErrors = Boolean.parseBoolean(repoAuditIgnoreErrors.trim()); + } + catch (NumberFormatException e) + { + ignoreErrors = true; + logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.ignore.errors = {}", repoAuditIgnoreErrors); + } + }else{ + ignoreErrors = true; + } + + // Fetch repository information from 'IntegrityMonitorProperties' + String repositoryId = + StateManagementProperties.getProperty("repository.audit.id"); + String repositoryUrl = + StateManagementProperties.getProperty("repository.audit.url"); + String repositoryUsername = + StateManagementProperties.getProperty("repository.audit.username"); + String repositoryPassword = + StateManagementProperties.getProperty("repository.audit.password"); + boolean upload = + (repositoryId != null && repositoryUrl != null + && repositoryUsername != null && repositoryPassword != null); + + // used to incrementally construct response as problems occur + // (empty = no problems) + StringBuilder response = new StringBuilder(); + + long timeoutInSeconds = DEFAULT_TIMEOUT; + String timeoutString = + StateManagementProperties.getProperty("repository.audit.timeout"); + if (timeoutString != null && !timeoutString.isEmpty()) + { + try + { + timeoutInSeconds = Long.valueOf(timeoutString); + } + catch (NumberFormatException e) + { + logger.error + ("RepositoryAudit: Invalid 'repository.audit.timeout' value: '{}'", timeoutString, e); + if (!ignoreErrors) + { + response.append("Invalid 'repository.audit.timeout' value: '") + .append(timeoutString).append("'\n"); + setResponse(response.toString()); + } + } + } + + // artifacts to be downloaded + LinkedList<Artifact> artifacts = new LinkedList<>(); + + /* + * 1) create temporary directory + */ + Path dir = Files.createTempDirectory("auditRepo"); + logger.info("RepositoryAudit: temporary directory = {}", dir); + + // nested 'pom.xml' file and 'repo' directory + Path pom = dir.resolve("pom.xml"); + Path repo = dir.resolve("repo"); + + /* + * 2) Create test file, and upload to repository + * (only if repository information is specified) + */ + String groupId = null; + String artifactId = null; + String version = null; + if (upload) + { + groupId = "org.onap.policy.audit"; + artifactId = "repository-audit"; + version = "0." + System.currentTimeMillis(); + + if (repositoryUrl.toLowerCase().contains("snapshot")) + { + // use SNAPSHOT version + version += "-SNAPSHOT"; + } + + // create text file to write + FileOutputStream fos = + new FileOutputStream(dir.resolve("repository-audit.txt").toFile()); + try + { + fos.write(version.getBytes()); + } + finally + { + fos.close(); + } + + // try to install file in repository + if (runProcess + (timeoutInSeconds, dir.toFile(), null, + "mvn", "deploy:deploy-file", + "-DrepositoryId=" + repositoryId, + "-Durl=" + repositoryUrl, + "-Dfile=repository-audit.txt", + "-DgroupId=" + groupId, + "-DartifactId=" + artifactId, + "-Dversion=" + version, + "-Dpackaging=txt", + "-DgeneratePom=false") != 0) + { + logger.error + ("RepositoryAudit: 'mvn deploy:deploy-file' failed"); + if (!ignoreErrors) + { + response.append("'mvn deploy:deploy-file' failed\n"); + setResponse(response.toString()); + } + } + else + { + logger.info + ("RepositoryAudit: 'mvn deploy:deploy-file succeeded"); + + // we also want to include this new artifact in the download + // test (steps 3 and 4) + artifacts.add(new Artifact(groupId, artifactId, version, "txt")); + } + } + + /* + * 3) create 'pom.xml' file in temporary directory + */ + artifacts.add(new Artifact("org.apache.maven/maven-embedder/3.2.2")); + + StringBuilder sb = new StringBuilder(); + sb.append + ("<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n" + + " xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd\">\n" + + "\n" + + " <modelVersion>4.0.0</modelVersion>\n" + + " <groupId>empty</groupId>\n" + + " <artifactId>empty</artifactId>\n" + + " <version>1.0-SNAPSHOT</version>\n" + + " <packaging>pom</packaging>\n" + + "\n" + + " <build>\n" + + " <plugins>\n" + + " <plugin>\n" + + " <groupId>org.apache.maven.plugins</groupId>\n" + + " <artifactId>maven-dependency-plugin</artifactId>\n" + + " <version>2.10</version>\n" + + " <executions>\n" + + " <execution>\n" + + " <id>copy</id>\n" + + " <goals>\n" + + " <goal>copy</goal>\n" + + " </goals>\n" + + " <configuration>\n" + + " <localRepositoryDirectory>") + .append(repo) + .append("</localRepositoryDirectory>\n") + .append(" <artifactItems>\n"); + for (Artifact artifact : artifacts) + { + // each artifact results in an 'artifactItem' element + sb.append + (" <artifactItem>\n" + + " <groupId>") + .append(artifact.groupId) + .append + ("</groupId>\n" + + " <artifactId>") + .append(artifact.artifactId) + .append + ("</artifactId>\n" + + " <version>") + .append(artifact.version) + .append + ("</version>\n" + + " <type>") + .append(artifact.type) + .append + ("</type>\n" + + " </artifactItem>\n"); + } + sb.append + (" </artifactItems>\n" + + " </configuration>\n" + + " </execution>\n" + + " </executions>\n" + + " </plugin>\n" + + " </plugins>\n" + + " </build>\n" + + "</project>\n"); + FileOutputStream fos = new FileOutputStream(pom.toFile()); + try + { + fos.write(sb.toString().getBytes()); + } + finally + { + fos.close(); + } + + /* + * 4) Invoke external 'mvn' process to do the downloads + */ + + // output file = ${dir}/out (this supports step '4a') + File output = dir.resolve("out").toFile(); + + // invoke process, and wait for response + int rval = runProcess + (timeoutInSeconds, dir.toFile(), output, "mvn", "compile"); + logger.info("RepositoryAudit: 'mvn' return value = {}", rval); + if (rval != 0) + { + logger.error + ("RepositoryAudit: 'mvn compile' invocation failed"); + if (!ignoreErrors) + { + response.append("'mvn compile' invocation failed\n"); + setResponse(response.toString()); + } + } + + /* + * 4a) Check attempted and successful downloads from output file + * Note: at present, this step just generates log messages, + * but doesn't do any verification. + */ + if (rval == 0) + { + // place output in 'fileContents' (replacing the Return characters + // with Newline) + byte[] outputData = new byte[(int)output.length()]; + FileInputStream fis = new FileInputStream(output); + fis.read(outputData); + String fileContents = new String(outputData).replace('\r','\n'); + fis.close(); + + // generate log messages from 'Downloading' and 'Downloaded' + // messages within the 'mvn' output + int index = 0; + while ((index = fileContents.indexOf("\nDown", index)) > 0) + { + index += 5; + if (fileContents.regionMatches(index, "loading: ", 0, 9)) + { + index += 9; + int endIndex = fileContents.indexOf('\n', index); + logger.info + ("RepositoryAudit: Attempted download: '{}'", fileContents.substring(index, endIndex)); + index = endIndex; + } + else if (fileContents.regionMatches(index, "loaded: ", 0, 8)) + { + index += 8; + int endIndex = fileContents.indexOf(' ', index); + logger.info + ("RepositoryAudit: Successful download: '{}'",fileContents.substring(index, endIndex)); + index = endIndex; + } + } + } + + /* + * 5) Check the contents of the directory to make sure the downloads + * were successful + */ + for (Artifact artifact : artifacts) + { + if (repo.resolve(artifact.groupId.replace('.','/')) + .resolve(artifact.artifactId) + .resolve(artifact.version) + .resolve(artifact.artifactId + "-" + artifact.version + "." + + artifact.type).toFile().exists()) + { + // artifact exists, as expected + logger.info("RepositoryAudit: {} : exists", artifact.toString()); + } + else + { + // Audit ERROR: artifact download failed for some reason + logger.error("RepositoryAudit: {}: does not exist", artifact.toString()); + if (!ignoreErrors) + { + response.append("Failed to download artifact: ") + .append(artifact).append('\n'); + setResponse(response.toString()); + } + } + } + + /* + * 6) Use 'curl' to delete the uploaded test file + * (only if repository information is specified) + */ + if (upload) + { + if (runProcess + (timeoutInSeconds, dir.toFile(), null, + "curl", + "--request", "DELETE", + "--user", repositoryUsername + ":" + repositoryPassword, + (repositoryUrl + "/" + groupId.replace('.', '/') + "/" + + artifactId + "/" + version)) + != 0) + { + logger.error + ("RepositoryAudit: delete of uploaded artifact failed"); + if (!ignoreErrors) + { + response.append("delete of uploaded artifact failed\n"); + setResponse(response.toString()); + } + } + else + { + logger.info + ("RepositoryAudit: delete of uploaded artifact succeeded"); + artifacts.add(new Artifact(groupId, artifactId, version, "txt")); + } + } + + /* + * 7) Remove the temporary directory + */ + Files.walkFileTree + (dir, + new SimpleFileVisitor<Path>() + { + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) + { + // logger.info("RepositoryAudit: Delete " + file); + file.toFile().delete(); + return(FileVisitResult.CONTINUE); + } + + public FileVisitResult postVisitDirectory(Path file, IOException e) + throws IOException + { + if (e == null) + { + // logger.info("RepositoryAudit: Delete " + file); + file.toFile().delete(); + return(FileVisitResult.CONTINUE); + } + else + { + throw(e); + } + } + }); + } + + /** + * Run a process, and wait for the response + * + * @param timeoutInSeconds the number of seconds to wait for the + * process to terminate + * @param directory the execution directory of the process + * (null = current directory) + * @param stdout the file to contain the standard output + * (null = discard standard output) + * @param command command and arguments + * @return the return value of the process + * @throws IOException, InterruptedException + */ + static int runProcess(long timeoutInSeconds, + File directory, File stdout, String... command) + throws IOException, InterruptedException + { + ProcessBuilder pb = new ProcessBuilder(command); + if (directory != null) + { + pb.directory(directory); + } + if (stdout != null) + { + pb.redirectOutput(stdout); + } + + Process process = pb.start(); + if (process.waitFor(timeoutInSeconds, TimeUnit.SECONDS)) + { + // process terminated before the timeout + return(process.exitValue()); + } + + // process timed out -- kill it, and return -1 + process.destroyForcibly(); + return(-1); + } + + /* ============================================================ */ + + /** + * An instance of this class exists for each artifact that we are trying + * to download. + */ + static class Artifact + { + String groupId, artifactId, version, type; + + /** + * Constructor - populate the 'Artifact' instance + * + * @param groupId groupId of artifact + * @param artifactId artifactId of artifact + * @param version version of artifact + * @param type type of the artifact (e.g. "jar") + */ + Artifact(String groupId, String artifactId, String version, String type) + { + this.groupId = groupId; + this.artifactId = artifactId; + this.version = version; + this.type = type; + } + + /** + * Constructor - populate an 'Artifact' instance + * + * @param artifact a string of the form: + * "<groupId>/<artifactId>/<version>[/<type>]" + * @throws IllegalArgumentException if 'artifact' has the incorrect format + */ + Artifact(String artifact) + { + String[] segments = artifact.split("/"); + if (segments.length != 4 && segments.length != 3) + { + throw(new IllegalArgumentException("groupId/artifactId/version/type")); + } + groupId = segments[0]; + artifactId = segments[1]; + version = segments[2]; + type = (segments.length == 4 ? segments[3] : "jar"); + } + + /** + * @return the artifact id in the form: + * "<groupId>/<artifactId>/<version>/<type>" + */ + public String toString() + { + return(groupId + "/" + artifactId + "/" + version + "/" + type); + } + } +} diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeature.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeature.java new file mode 100644 index 00000000..6d47039e --- /dev/null +++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeature.java @@ -0,0 +1,275 @@ +/*- + * ============LICENSE_START======================================================= + * feature-state-management + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.statemanagement; + +import java.io.IOException; +import java.util.Observer; +import java.util.Properties; + +import org.onap.policy.drools.statemanagement.StateManagementFeatureAPI; +import org.onap.policy.common.im.StandbyStatusException; +import org.onap.policy.common.im.StateManagement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.onap.policy.drools.core.PolicySessionFeatureAPI; +import org.onap.policy.drools.features.PolicyEngineFeatureAPI; +import org.onap.policy.drools.utils.PropertyUtil; + +/** + * If this feature is supported, there is a single instance of it. + * It adds persistence to Drools sessions, but it is also intertwined with + * active/standby state management and IntegrityMonitor. For now, they are + * all treated as a single feature, but it would be nice to separate them. + * + * The bulk of the code here was once in other classes, such as + * 'PolicyContainer' and 'Main'. It was moved here as part of making this + * a separate optional feature. + */ + +public class StateManagementFeature implements StateManagementFeatureAPI, + PolicySessionFeatureAPI, PolicyEngineFeatureAPI +{ + // get an instance of logger + private static final Logger logger = + LoggerFactory.getLogger(StateManagementFeature.class); + + private DroolsPDPIntegrityMonitor droolsPdpIntegrityMonitor = null; + private StateManagement stateManagement = null; + + /**************************/ + /* 'FeatureAPI' interface */ + /**************************/ + + public StateManagementFeature(){ + if(logger.isDebugEnabled()){ + logger.debug("StateManagementFeature() constructor"); + } + } + + @Override + public void globalInit(String args[], String configDir) + { + // Initialization code associated with 'PolicyContainer' + if(logger.isDebugEnabled()){ + logger.debug("StateManagementFeature.globalInit({}) entry", configDir); + } + + try + { + droolsPdpIntegrityMonitor = DroolsPDPIntegrityMonitor.init(configDir); + } + catch (Exception e) + { + if(logger.isDebugEnabled()){ + logger.debug("DroolsPDPIntegrityMonitor initialization exception: ", e); + } + logger.error("DroolsPDPIntegrityMonitor.init()", e); + } + + initializeProperties(configDir); + + //At this point the DroolsPDPIntegrityMonitor instance must exist. Let's check it. + try { + droolsPdpIntegrityMonitor = DroolsPDPIntegrityMonitor.getInstance(); + stateManagement = droolsPdpIntegrityMonitor.getStateManager(); + if(logger.isDebugEnabled()){ + logger.debug("StateManagementFeature.globalInit(): " + + "stateManagement.getAdminState(): {}", stateManagement.getAdminState()); + } + if(stateManagement == null){ + if(logger.isDebugEnabled()){ + logger.debug("StateManagementFeature.globalInit(): stateManagement is NULL!"); + } + } + } catch (Exception e1) { + String msg = " \n"; + if(logger.isDebugEnabled()){ + logger.debug("StateManagementFeature.globalInit(): DroolsPDPIntegrityMonitor" + + " initialization failed with exception:", e1); + } + logger.error("DroolsPDPIntegrityMonitor.init(): StateManagementFeature startup failed " + + "to get DroolsPDPIntegrityMonitor instance:", e1); + e1.printStackTrace(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void addObserver(Observer stateChangeObserver) { + if(logger.isDebugEnabled()){ + logger.debug("StateManagementFeature.addObserver() entry\n" + + "StateManagementFeature.addObserver(): " + + "stateManagement.getAdminState(): {}", stateManagement.getAdminState()); + } + stateManagement.addObserver(stateChangeObserver); + if(logger.isDebugEnabled()){ + logger.debug("StateManagementFeature.addObserver() exit"); + } + } + + /** + * {@inheritDoc} + */ + @Override + public String getAdminState() { + return stateManagement.getAdminState(); + } + + /** + * {@inheritDoc} + */ + @Override + public String getOpState() { + return stateManagement.getOpState(); + } + + /** + * {@inheritDoc} + */ + @Override + public String getAvailStatus() { + return stateManagement.getAvailStatus(); + } + + /** + * {@inheritDoc} + */ + @Override + public String getStandbyStatus() { + return stateManagement.getStandbyStatus(); + } + + /** + * {@inheritDoc} + */ + @Override + public String getStandbyStatus(String resourceName) { + return stateManagement.getStandbyStatus(resourceName); + } + + /** + * {@inheritDoc} + */ + @Override + public void disableFailed(String resourceName) throws Exception { + stateManagement.disableFailed(resourceName); + + } + + /** + * {@inheritDoc} + */ + @Override + public void disableFailed() throws Exception { + stateManagement.disableFailed(); + } + + /** + * {@inheritDoc} + */ + @Override + public void promote() throws StandbyStatusException, Exception { + stateManagement.promote(); + } + + /** + * {@inheritDoc} + */ + @Override + public void demote() throws Exception { + stateManagement.demote(); + } + + /** + * {@inheritDoc} + */ + @Override + public String getResourceName() { + return StateManagementProperties.getProperty(StateManagementProperties.NODE_NAME); + } + + /** + * {@inheritDoc} + * @return + */ + @Override + public boolean lock(){ + try{ + stateManagement.lock(); + }catch(Exception e){ + logger.error("StateManagementFeature.lock() failed with exception: {}", e); + return false; + } + return true; + } + + /** + * {@inheritDoc} + * @throws Exception + */ + @Override + public boolean unlock(){ + try{ + stateManagement.unlock(); + }catch(Exception e){ + logger.error("StateManagementFeature.unlock() failed with exception: {}", e); + return false; + } + return true; + } + + /** + * {@inheritDoc} + * @throws Exception + */ + @Override + public boolean isLocked(){ + String admin = stateManagement.getAdminState(); + if(admin.equals(StateManagement.LOCKED)){ + return true; + }else{ + return false; + } + } + + @Override + public int getSequenceNumber() { + return SEQ_NUM; + } + + /** + * Read in the properties and initialize the StateManagementProperties. + */ + private static void initializeProperties(String configDir) + { + //Get the state management properties + try { + Properties pIm = + PropertyUtil.getProperties(configDir + "/feature-state-management.properties"); + StateManagementProperties.initProperties(pIm); + logger.info("initializeProperties: resourceName= {}", StateManagementProperties.getProperty(StateManagementProperties.NODE_NAME)); + } catch (IOException e1) { + logger.error("initializeProperties", e1); + } + } +} diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementProperties.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementProperties.java new file mode 100644 index 00000000..c8e17ea9 --- /dev/null +++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementProperties.java @@ -0,0 +1,64 @@ +/*- + * ============LICENSE_START======================================================= + * feature-state-management + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.statemanagement; + +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StateManagementProperties { + // get an instance of logger + private static final Logger logger = LoggerFactory.getLogger(StateManagementProperties.class); + + public static final String NODE_NAME = "resource.name"; + public static final String SITE_NAME = "site_name"; + + public static final String DB_DRIVER = "javax.persistence.jdbc.driver"; + public static final String DB_URL = "javax.persistence.jdbc.url"; + public static final String DB_USER = "javax.persistence.jdbc.user"; + public static final String DB_PWD = "javax.persistence.jdbc.password"; + + private static Properties properties = null; + /* + * Initialize the parameter values from the feature-state-management.properties file values + * + * This is designed so that the Properties object is obtained from the feature-state-management.properties + * file and then is passed to this method to initialize the value of the parameters. + * This allows the flexibility of JUnit tests using getProperties(filename) to get the + * properties while runtime methods can use getPropertiesFromClassPath(filename). + * + */ + public static void initProperties (Properties prop){ + logger.info("StateManagementProperties.initProperties(Properties): entry"); + logger.info("\n\nStateManagementProperties.initProperties: Properties = \n{}\n\n", prop); + + properties = prop; + } + + public static String getProperty(String key){ + return properties.getProperty(key); + } + + public static Properties getProperties() { + return properties; + } +} diff --git a/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI b/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI new file mode 100644 index 00000000..9ffef571 --- /dev/null +++ b/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI @@ -0,0 +1 @@ +org.onap.policy.drools.statemanagement.StateManagementFeature
\ No newline at end of file diff --git a/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI b/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI new file mode 100644 index 00000000..74d0b995 --- /dev/null +++ b/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI @@ -0,0 +1 @@ +org.onap.policy.drools.statemanagement.StateManagementFeature diff --git a/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.statemanagement.StateManagementFeatureAPI b/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.statemanagement.StateManagementFeatureAPI new file mode 100644 index 00000000..74d0b995 --- /dev/null +++ b/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.statemanagement.StateManagementFeatureAPI @@ -0,0 +1 @@ +org.onap.policy.drools.statemanagement.StateManagementFeature diff --git a/feature-state-management/src/test/java/org/onap/policy/drools/statemanagement/test/StateManagementTest.java b/feature-state-management/src/test/java/org/onap/policy/drools/statemanagement/test/StateManagementTest.java new file mode 100644 index 00000000..e458dcea --- /dev/null +++ b/feature-state-management/src/test/java/org/onap/policy/drools/statemanagement/test/StateManagementTest.java @@ -0,0 +1,245 @@ +/*- + * ============LICENSE_START======================================================= + * policy-persistence + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.statemanagement.test; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileInputStream; +import java.util.Properties; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.EntityTransaction; +import javax.persistence.Persistence; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.onap.policy.common.im.StateManagement; +import org.onap.policy.drools.core.PolicySessionFeatureAPI; +import org.onap.policy.drools.statemanagement.StateManagementFeatureAPI; +import org.onap.policy.drools.statemanagement.StateManagementProperties; + +public class StateManagementTest { + + // get an instance of logger + private static Logger logger = LoggerFactory.getLogger(StateManagementTest.class); + + /* + * Sleep 5 seconds after each test to allow interrupt (shutdown) recovery. + */ + + private long interruptRecoveryTime = 1000; + + StateManagementFeatureAPI stateManagementFeature; + + /* + * All you need to do here is create an instance of StateManagementFeature class. Then, + * check it initial state and the state after diableFailed() and promote() + */ + + @BeforeClass + public static void setUpClass() throws Exception { + + logger.info("setUpClass: Entering"); + + String userDir = System.getProperty("user.dir"); + logger.debug("setUpClass: userDir=" + userDir); + System.setProperty("com.sun.management.jmxremote.port", "9980"); + System.setProperty("com.sun.management.jmxremote.authenticate","false"); + + initializeDb(); + + logger.info("setUpClass: Exiting"); + } + + @AfterClass + public static void tearDownClass() throws Exception { + + } + + @Before + public void setUp() throws Exception { + + } + + @After + public void tearDown() throws Exception { + + } + + + /* + * Verifies that StateManagementFeature starts and runs successfully. + */ + + //@Ignore + @Test + public void testStateManagementOperation() throws Exception { + + logger.debug("\n\ntestStateManagementOperation: Entering\n\n"); + + logger.debug("testStateManagementOperation: Reading StateManagementProperties"); + + String configDir = "src/test/resources"; + + Properties fsmProperties = new Properties(); + fsmProperties.load(new FileInputStream(new File( + configDir + "/feature-state-management.properties"))); + String thisPdpId = fsmProperties + .getProperty(StateManagementProperties.NODE_NAME); + + StateManagementFeatureAPI stateManagementFeature = null; + for (StateManagementFeatureAPI feature : StateManagementFeatureAPI.impl.getList()) + { + ((PolicySessionFeatureAPI) feature).globalInit(null, configDir); + stateManagementFeature = feature; + logger.debug("testStateManagementOperation stateManagementFeature.getResourceName(): " + stateManagementFeature.getResourceName()); + break; + } + if(stateManagementFeature == null){ + String msg = "testStateManagementOperation failed to initialize. " + + "Unable to get instance of StateManagementFeatureAPI " + + "with resourceID: " + thisPdpId; + logger.error(msg); + logger.debug(msg); + } + + Thread.sleep(interruptRecoveryTime); + + String admin = stateManagementFeature.getAdminState(); + String oper = stateManagementFeature.getOpState(); + String avail = stateManagementFeature.getAvailStatus(); + String standby = stateManagementFeature.getStandbyStatus(); + + logger.debug("admin = {}", admin); + System.out.println("admin = " + admin); + logger.debug("oper = {}", oper); + System.out.println("oper = " + oper); + logger.debug("avail = {}", avail); + System.out.println("avail = " + avail); + logger.debug("standby = {}", standby); + System.out.println("standby = " + standby); + + assertTrue("Admin state not unlocked after initialization", admin.equals(StateManagement.UNLOCKED)); + assertTrue("Operational state not enabled after initialization", oper.equals(StateManagement.ENABLED)); + + try{ + stateManagementFeature.disableFailed(); + }catch(Exception e){ + logger.error(e.getMessage()); + System.out.println(e.getMessage()); + assertTrue(e.getMessage(), false); + } + + Thread.sleep(interruptRecoveryTime); + + admin = stateManagementFeature.getAdminState(); + oper = stateManagementFeature.getOpState(); + avail = stateManagementFeature.getAvailStatus(); + standby = stateManagementFeature.getStandbyStatus(); + + logger.debug("after disableFailed()"); + System.out.println("after disableFailed()"); + logger.debug("admin = {}", admin); + System.out.println("admin = " + admin); + logger.debug("oper = {}", oper); + System.out.println("oper = " + oper); + logger.debug("avail = {}", avail); + System.out.println("avail = " + avail); + logger.debug("standby = {}", standby); + System.out.println("standby = " + standby); + + assertTrue("Operational state not disabled after disableFailed()", oper.equals(StateManagement.DISABLED)); + assertTrue("Availability status not failed after disableFailed()", avail.equals(StateManagement.FAILED)); + + + try{ + stateManagementFeature.promote(); + }catch(Exception e){ + logger.debug(e.getMessage()); + System.out.println(e.getMessage()); + } + + Thread.sleep(interruptRecoveryTime); + + admin = stateManagementFeature.getAdminState(); + oper = stateManagementFeature.getOpState(); + avail = stateManagementFeature.getAvailStatus(); + standby = stateManagementFeature.getStandbyStatus(); + + logger.debug("after promote()"); + System.out.println("after promote()"); + logger.debug("admin = {}", admin); + System.out.println("admin = " + admin); + logger.debug("oper = {}", oper); + System.out.println("oper = " + oper); + logger.debug("avail = {}", avail); + System.out.println("avail = " + avail); + logger.debug("standby = {}", standby); + System.out.println("standby = " + standby); + + assertTrue("Standby status not coldstandby after promote()", standby.equals(StateManagement.COLD_STANDBY)); + + logger.debug("\n\ntestStateManagementOperation: Exiting\n\n"); + } + + /* + * This method initializes and cleans the DB so that PDP-D will be able to + * store fresh records in the DB. + */ + + public static void initializeDb(){ + + logger.debug("initializeDb: Entering"); + + Properties cleanProperties = new Properties(); + cleanProperties.put(StateManagementProperties.DB_DRIVER,"org.h2.Driver"); + cleanProperties.put(StateManagementProperties.DB_URL, "jdbc:h2:file:./sql/statemanagement"); + cleanProperties.put(StateManagementProperties.DB_USER, "sa"); + cleanProperties.put(StateManagementProperties.DB_PWD, ""); + + EntityManagerFactory emf = Persistence.createEntityManagerFactory("junitPU", cleanProperties); + + EntityManager em = emf.createEntityManager(); + // Start a transaction + EntityTransaction et = em.getTransaction(); + + et.begin(); + + // Clean up the DB + em.createQuery("Delete from StateManagementEntity").executeUpdate(); + em.createQuery("Delete from ForwardProgressEntity").executeUpdate(); + em.createQuery("Delete from ResourceRegistrationEntity").executeUpdate(); + + // commit transaction + et.commit(); + em.close(); + + logger.debug("initializeDb: Exiting"); + } +} diff --git a/feature-state-management/src/test/resources/META-INF/persistence.xml b/feature-state-management/src/test/resources/META-INF/persistence.xml new file mode 100644 index 00000000..d26ab443 --- /dev/null +++ b/feature-state-management/src/test/resources/META-INF/persistence.xml @@ -0,0 +1,39 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ============LICENSE_START======================================================= + feature-state-management + ================================================================================ + Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + +<persistence version="2.1" + xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd"> + + <persistence-unit name="junitPU" transaction-type="RESOURCE_LOCAL"> + <provider>org.eclipse.persistence.jpa.PersistenceProvider</provider> + <class>org.onap.policy.common.im.jpa.StateManagementEntity</class> + <class>org.onap.policy.common.im.jpa.ForwardProgressEntity</class> + <class>org.onap.policy.common.im.jpa.ResourceRegistrationEntity</class> + <properties> + <property name="javax.persistence.schema-generation.database.action" value="drop-and-create"/> + <property name="javax.persistence.schema-generation.scripts.action" value="drop-and-create"/> + <property name="javax.persistence.schema-generation.scripts.create-target" value="./sql/generatedCreateStateManagement.ddl"/> + <property name="javax.persistence.schema-generation.scripts.drop-target" value="./sql/generatedDropStateManagement.ddl"/> + </properties> + </persistence-unit> + +</persistence> diff --git a/feature-state-management/src/test/resources/feature-state-management.properties b/feature-state-management/src/test/resources/feature-state-management.properties new file mode 100644 index 00000000..7b4a697e --- /dev/null +++ b/feature-state-management/src/test/resources/feature-state-management.properties @@ -0,0 +1,74 @@ +### +# ============LICENSE_START======================================================= +# feature-state-management +# ================================================================================ +# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +# DB properties +javax.persistence.jdbc.driver = org.h2.Driver +javax.persistence.jdbc.url = jdbc:h2:file:./sql/statemanagement +javax.persistence.jdbc.user = sa +javax.persistence.jdbc.password = + +# DroolsPDPIntegrityMonitor Properties +hostPort = 0.0.0.0:57692 + +#IntegrityMonitor Properties + +# Must be unique across the system +resource.name=pdp1 +# Name of the site in which this node is hosted +site_name = pdp_1 +# Forward Progress Monitor update interval seconds +fp_monitor_interval = 30 +# Failed counter threshold before failover +failed_counter_threshold = 3 +# Interval between test transactions when no traffic seconds +test_trans_interval = 10 +# Interval between writes of the FPC to the DB seconds +write_fpc_interval = 5 +# Node type Note: Make sure you don't leave any trailing spaces, or you'll get an 'invalid node type' error! +node_type = pdp_drools +# Dependency groups are groups of resources upon which a node operational state is dependent upon. +# Each group is a comma-separated list of resource names and groups are separated by a semicolon. For example: +# dependency_groups=site_1.astra_1,site_1.astra_2;site_1.brms_1,site_1.brms_2;site_1.logparser_1;site_1.pypdp_1 +dependency_groups= +# When set to true, dependent health checks are performed by using JMX to invoke test() on the dependent. +# The default false is to use state checks for health. +test_via_jmx=true +# This is the max number of seconds beyond which a non incrementing FPC is considered a failure +max_fpc_update_interval=120 +# Run the state audit every 60 seconds (60000 ms). The state audit finds stale DB entries in the +# forwardprogressentity table and marks the node as disabled/failed in the statemanagemententity +# table. NOTE! It will only run on nodes that have a standbystatus = providingservice. +# A value of <= 0 will turn off the state audit. +state_audit_interval_ms=60000 +# The refresh state audit is run every (default) 10 minutes (600000 ms) to clean up any state corruption in the +# DB statemanagemententity table. It only refreshes the DB state entry for the local node. That is, it does not +# refresh the state of any other nodes. A value <= 0 will turn the audit off. Any other value will override +# the default of 600000 ms. +refresh_state_audit_interval_ms=600000 + + +# Repository audit properties +# Flag to control the execution of the subsystemTest for the Nexus Maven repository +repository.audit.is.active=false +repository.audit.ignore.errors=true + +# DB Audit Properties +# Flag to control the execution of the subsystemTest for the Database +db.audit.is.active=false diff --git a/feature-state-management/src/test/resources/logback-test.xml b/feature-state-management/src/test/resources/logback-test.xml new file mode 100644 index 00000000..58cabf98 --- /dev/null +++ b/feature-state-management/src/test/resources/logback-test.xml @@ -0,0 +1,47 @@ +<!-- + ============LICENSE_START======================================================= + feature-state-management + ================================================================================ + Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + +<!-- Controls the output of logs for JUnit tests --> + +<configuration> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> + <Pattern> + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}.%M\(%line\) - %msg%n + </Pattern> + </encoder> + </appender> + <appender name="FILE" class="ch.qos.logback.core.FileAppender"> + <file>logs/debug.log</file> + <encoder> + <Pattern> + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}.%M\(%line\) - %msg%n + </Pattern> + </encoder> + </appender> + + <root level="debug"> + <appender-ref ref="STDOUT" /> + <appender-ref ref="FILE" /> + </root> + +</configuration> + diff --git a/packages/install/pom.xml b/packages/install/pom.xml index 2b3a136d..7a765c3a 100644 --- a/packages/install/pom.xml +++ b/packages/install/pom.xml @@ -95,6 +95,12 @@ <version>${project.version}</version> <type>zip</type> </dependency> + <dependency> + <groupId>org.onap.policy.drools-pdp</groupId> + <artifactId>feature-state-management</artifactId> + <version>${project.version}</version> + <type>zip</type> + </dependency> </dependencies> </project> diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java index cf94bfcb..9fc2c837 100644 --- a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java +++ b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java @@ -457,10 +457,19 @@ public class PolicyContainer implements Startable logger.info("updateToVersion:releaseId " + releaseId.toString()); } - // notify all 'PolicySession' instances + // stop all session threads + for (PolicySession session : sessions.values()) + { + session.stopThread(); + } + + // update the version Results results = kieContainer.updateToVersion(releaseId); + + // restart all session threads, and notify the sessions for (PolicySession session : sessions.values()) { + session.startThread(); session.updated(); } diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java index cc3705ee..09ee9a4e 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -29,214 +29,217 @@ import org.onap.policy.drools.event.comm.bus.DmaapTopicSource; import org.onap.policy.drools.event.comm.bus.NoopTopicSink; import org.onap.policy.drools.event.comm.bus.UebTopicSink; import org.onap.policy.drools.event.comm.bus.UebTopicSource; -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; import org.onap.policy.drools.properties.Lockable; import org.onap.policy.drools.properties.Startable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.annotation.JsonIgnore; /** - * Abstraction to managed the system's Networked Topic Endpoints, - * sources of all events input into the System. + * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into + * the System. */ public interface TopicEndpoint extends Startable, Lockable { - - /** - * Add Topic Sources to the communication infrastructure initialized per - * properties - * - * @param properties properties for Topic Source construction - * @return a generic Topic Source - * @throws IllegalArgumentException when invalid arguments are provided - */ - public List<TopicSource> addTopicSources(Properties properties); - - /** - * Add Topic Sinks to the communication infrastructure initialized per - * properties - * - * @param properties properties for Topic Sink construction - * @return a generic Topic Sink - * @throws IllegalArgumentException when invalid arguments are provided - */ - public List<TopicSink> addTopicSinks(Properties properties); - - /** - * gets all Topic Sources - * @return the Topic Source List - */ - List<TopicSource> getTopicSources(); - - /** - * get the Topic Sources for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source List - * @throws IllegalStateException if the entity is in an invalid state - * @throws IllegalArgumentException if invalid parameters are present - */ - public List<TopicSource> getTopicSources(List<String> topicNames); - - /** - * gets the Topic Source for the given topic name and - * underlying communication infrastructure type - * - * @param commType communication infrastructure type - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - * @throws UnsupportedOperationException if the operation is not supported. - */ - public TopicSource getTopicSource(Topic.CommInfrastructure commType, - String topicName) - throws UnsupportedOperationException; - - /** - * get the UEB Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the UEB Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSource getUebTopicSource(String topicName); - - /** - * get the DMAAP Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the DMAAP Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSource getDmaapTopicSource(String topicName); - - /** - * get the Topic Sinks for the given topic name - * - * @param topicNames the topic names - * @return the Topic Sink List - * @throws IllegalStateException - * @throws IllegalArgumentException - */ - public List<TopicSink> getTopicSinks(List<String> topicNames); - - /** - * get the Topic Sinks for the given topic name and - * underlying communication infrastructure type - * - * @param topicName the topic name - * @param commType communication infrastructure type - * - * @return the Topic Sink List - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicWriters for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public TopicSink getTopicSink(Topic.CommInfrastructure commType, - String topicName) - throws UnsupportedOperationException; - - /** - * get the Topic Sinks for the given topic name and - * all the underlying communication infrastructure type - * - * @param topicName the topic name - * @param commType communication infrastructure type - * - * @return the Topic Sink List - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicWriters for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public List<TopicSink> getTopicSinks(String topicName); - - /** - * get the UEB Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public UebTopicSink getUebTopicSink(String topicName); - - /** - * get the no-op Topic Sink for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public NoopTopicSink getNoopTopicSink(String topicName); - - /** - * get the DMAAP Topic Source for the given topic name - * - * @param topicName the topic name - * - * @return the Topic Source - * @throws IllegalStateException if the entity is in an invalid state, for - * example multiple TopicReaders for a topic name and communication infrastructure - * @throws IllegalArgumentException if invalid parameters are present - */ - public DmaapTopicSink getDmaapTopicSink(String topicName); - - /** - * gets only the UEB Topic Sources - * @return the UEB Topic Source List - */ - public List<UebTopicSource> getUebTopicSources(); - - /** - * gets only the DMAAP Topic Sources - * @return the DMAAP Topic Source List - */ - public List<DmaapTopicSource> getDmaapTopicSources(); - - /** - * gets all Topic Sinks - * @return the Topic Sink List - */ - public List<TopicSink> getTopicSinks(); - - /** - * gets only the UEB Topic Sinks - * @return the UEB Topic Sink List - */ - public List<UebTopicSink> getUebTopicSinks(); - - /** - * gets only the DMAAP Topic Sinks - * @return the DMAAP Topic Sink List - */ - public List<DmaapTopicSink> getDmaapTopicSinks(); - - /** - * gets only the NOOP Topic Sinks - * @return the NOOP Topic Sinks List - */ - public List<NoopTopicSink> getNoopTopicSinks(); - - /** - * singleton for global access - */ - public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); + + /** + * Add Topic Sources to the communication infrastructure initialized per properties + * + * @param properties properties for Topic Source construction + * @return a generic Topic Source + * @throws IllegalArgumentException when invalid arguments are provided + */ + public List<TopicSource> addTopicSources(Properties properties); + + /** + * Add Topic Sinks to the communication infrastructure initialized per properties + * + * @param properties properties for Topic Sink construction + * @return a generic Topic Sink + * @throws IllegalArgumentException when invalid arguments are provided + */ + public List<TopicSink> addTopicSinks(Properties properties); + + /** + * gets all Topic Sources + * + * @return the Topic Source List + */ + List<TopicSource> getTopicSources(); + + /** + * get the Topic Sources for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source List + * @throws IllegalStateException if the entity is in an invalid state + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<TopicSource> getTopicSources(List<String> topicNames); + + /** + * gets the Topic Source for the given topic name and underlying communication infrastructure type + * + * @param commType communication infrastructure type + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + * @throws UnsupportedOperationException if the operation is not supported. + */ + public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) + throws UnsupportedOperationException; + + /** + * get the UEB Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the UEB Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSource getUebTopicSource(String topicName); + + /** + * get the DMAAP Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the DMAAP Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSource getDmaapTopicSource(String topicName); + + /** + * get the Topic Sinks for the given topic name + * + * @param topicNames the topic names + * @return the Topic Sink List + * @throws IllegalStateException + * @throws IllegalArgumentException + */ + public List<TopicSink> getTopicSinks(List<String> topicNames); + + /** + * get the Topic Sinks for the given topic name and underlying communication infrastructure type + * + * @param topicName the topic name + * @param commType communication infrastructure type + * + * @return the Topic Sink List + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicWriters for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) + throws UnsupportedOperationException; + + /** + * get the Topic Sinks for the given topic name and all the underlying communication + * infrastructure type + * + * @param topicName the topic name + * @param commType communication infrastructure type + * + * @return the Topic Sink List + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicWriters for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<TopicSink> getTopicSinks(String topicName); + + /** + * get the UEB Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public UebTopicSink getUebTopicSink(String topicName); + + /** + * get the no-op Topic Sink for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public NoopTopicSink getNoopTopicSink(String topicName); + + /** + * get the DMAAP Topic Source for the given topic name + * + * @param topicName the topic name + * + * @return the Topic Source + * @throws IllegalStateException if the entity is in an invalid state, for example multiple + * TopicReaders for a topic name and communication infrastructure + * @throws IllegalArgumentException if invalid parameters are present + */ + public DmaapTopicSink getDmaapTopicSink(String topicName); + + /** + * gets only the UEB Topic Sources + * + * @return the UEB Topic Source List + */ + public List<UebTopicSource> getUebTopicSources(); + + /** + * gets only the DMAAP Topic Sources + * + * @return the DMAAP Topic Source List + */ + public List<DmaapTopicSource> getDmaapTopicSources(); + + /** + * gets all Topic Sinks + * + * @return the Topic Sink List + */ + public List<TopicSink> getTopicSinks(); + + /** + * gets only the UEB Topic Sinks + * + * @return the UEB Topic Sink List + */ + public List<UebTopicSink> getUebTopicSinks(); + + /** + * gets only the DMAAP Topic Sinks + * + * @return the DMAAP Topic Sink List + */ + public List<DmaapTopicSink> getDmaapTopicSinks(); + + /** + * gets only the NOOP Topic Sinks + * + * @return the NOOP Topic Sinks List + */ + public List<NoopTopicSink> getNoopTopicSinks(); + + /** + * singleton for global access + */ + public static final TopicEndpoint manager = new ProxyTopicEndpointManager(); } + /* * ----------------- implementation ------------------- */ @@ -246,398 +249,412 @@ public interface TopicEndpoint extends Startable, Lockable { * implementations according to the communication infrastructure that are supported */ class ProxyTopicEndpointManager implements TopicEndpoint { - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); - /** - * Is this element locked? - */ - protected volatile boolean locked = false; - - /** - * Is this element alive? - */ - protected volatile boolean alive = false; - - @Override - public List<TopicSource> addTopicSources(Properties properties) { - - // 1. Create UEB Sources - // 2. Create DMAAP Sources - - List<TopicSource> sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.build(properties)); - sources.addAll(DmaapTopicSource.factory.build(properties)); - - if (this.isLocked()) { - for (TopicSource source : sources) { - source.lock(); - } - } - - return sources; - } - - @Override - public List<TopicSink> addTopicSinks(Properties properties) { - // 1. Create UEB Sinks - // 2. Create DMAAP Sinks - - List<TopicSink> sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.build(properties)); - sinks.addAll(DmaapTopicSink.factory.build(properties)); - sinks.addAll(NoopTopicSink.factory.build(properties)); - - if (this.isLocked()) { - for (TopicSink sink : sinks) { - sink.lock(); - } - } - - return sinks; - } - - @Override - public List<TopicSource> getTopicSources() { - - List<TopicSource> sources = new ArrayList<>(); - - sources.addAll(UebTopicSource.factory.inventory()); - sources.addAll(DmaapTopicSource.factory.inventory()); - - return sources; - } - - @Override - public List<TopicSink> getTopicSinks() { - - List<TopicSink> sinks = new ArrayList<>(); - - sinks.addAll(UebTopicSink.factory.inventory()); - sinks.addAll(DmaapTopicSink.factory.inventory()); - sinks.addAll(NoopTopicSink.factory.inventory()); - - return sinks; - } - - @JsonIgnore - @Override - public List<UebTopicSource> getUebTopicSources() { - return UebTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List<DmaapTopicSource> getDmaapTopicSources() { - return DmaapTopicSource.factory.inventory(); - } - - @JsonIgnore - @Override - public List<UebTopicSink> getUebTopicSinks() { - return UebTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List<DmaapTopicSink> getDmaapTopicSinks() { - return DmaapTopicSink.factory.inventory(); - } - - @JsonIgnore - @Override - public List<NoopTopicSink> getNoopTopicSinks() { - return NoopTopicSink.factory.inventory(); - } - - @Override - public boolean start() { - - synchronized (this) { - if (this.locked) { - throw new IllegalStateException(this + " is locked"); - } - - if (this.alive) { - return true; - } - - this.alive = true; - } - - List<Startable> endpoints = getEndpoints(); - - boolean success = true; - for (Startable endpoint: endpoints) { - try { - success = endpoint.start() && success; - } catch (Exception e) { - success = false; - logger.error("Problem starting endpoint: {}", endpoint, e); - } - } - - return success; - } - - - @Override - public boolean stop() { - - /* - * stop regardless if it is locked, in other - * words, stop operation has precedence over - * locks. - */ - synchronized (this) { - this.alive = false; - } - - List<Startable> endpoints = getEndpoints(); - - boolean success = true; - for (Startable endpoint: endpoints) { - try { - success = endpoint.stop() && success; - } catch (Exception e) { - success = false; - logger.error("Problem stopping endpoint: {}", endpoint, e); - } - } - - return success; - } - - /** - * - * @return list of managed endpoints - */ - @JsonIgnore - protected List<Startable> getEndpoints() { - List<Startable> endpoints = new ArrayList<>(); - - endpoints.addAll(this.getTopicSources()); - endpoints.addAll(this.getTopicSinks()); - - return endpoints; - } - - @Override - public void shutdown() { - UebTopicSource.factory.destroy(); - UebTopicSink.factory.destroy(); - - DmaapTopicSource.factory.destroy(); - DmaapTopicSink.factory.destroy(); - } - - @Override - public boolean isAlive() { - return this.alive; - } - - @Override - public boolean lock() { - - synchronized (this) { - if (locked) - return true; - - this.locked = true; - } - - for (TopicSource source: this.getTopicSources()) { - source.lock(); - } - - for (TopicSink sink: this.getTopicSinks()) { - sink.lock(); - } - - return true; - } - - @Override - public boolean unlock() { - synchronized (this) { - if (!locked) - return true; - - this.locked = false; - } - - for (TopicSource source: this.getTopicSources()) { - source.unlock(); - } - - for (TopicSink sink: this.getTopicSinks()) { - sink.unlock(); - } - - return true; - } - - @Override - public boolean isLocked() { - return this.locked; - } - - @Override - public List<TopicSource> getTopicSources(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - List<TopicSource> sources = new ArrayList<>(); - for (String topic: topicNames) { - try { - TopicSource uebSource = this.getUebTopicSource(topic); - if (uebSource != null) - sources.add(uebSource); - } catch (Exception e) { - logger.info("No UEB source for topic: {}", topic, e); - } - - try { - TopicSource dmaapSource = this.getDmaapTopicSource(topic); - if (dmaapSource != null) - sources.add(dmaapSource); - } catch (Exception e) { - logger.info("No DMAAP source for topic: {}", topic, e); - } - } - return sources; - } - - @Override - public List<TopicSink> getTopicSinks(List<String> topicNames) { - - if (topicNames == null) { - throw new IllegalArgumentException("must provide a list of topics"); - } - - List<TopicSink> sinks = new ArrayList<>(); - for (String topic: topicNames) { - try { - TopicSink uebSink = this.getUebTopicSink(topic); - if (uebSink != null) - sinks.add(uebSink); - } catch (Exception e) { - logger.info("No UEB sink for topic: {}", topic, e); - } - - try { - TopicSink dmaapSink = this.getDmaapTopicSink(topic); - if (dmaapSink != null) - sinks.add(dmaapSink); - } catch (Exception e) { - logger.info("No DMAAP sink for topic: {}", topic, e); - } - } - return sinks; - } - - @Override - public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) - throws UnsupportedOperationException { - - if (commType == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - if (topicName == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSource(topicName); - case DMAAP: - return this.getDmaapTopicSource(topicName); - case REST: - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - @Override - public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) - throws UnsupportedOperationException { - if (commType == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - if (topicName == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - switch (commType) { - case UEB: - return this.getUebTopicSink(topicName); - case DMAAP: - return this.getDmaapTopicSink(topicName); - case REST: - default: - throw new UnsupportedOperationException("Unsupported " + commType.name()); - } - } - - @Override - public List<TopicSink> getTopicSinks(String topicName) { - - if (topicName == null) { - throw new IllegalArgumentException - ("Invalid parameter: a communication infrastructure required to fetch " + topicName); - } - - List<TopicSink> sinks = new ArrayList<>(); - - try { - sinks.add(this.getUebTopicSink(topicName)); - } catch (Exception e) { - logger.debug("No sink for topic: {}", topicName, e); - } - - try { - sinks.add(this.getDmaapTopicSink(topicName)); - } catch (Exception e) { - logger.debug("No sink for topic: {}", topicName, e); - } - - return sinks; - } - - @Override - public UebTopicSource getUebTopicSource(String topicName) { - return UebTopicSource.factory.get(topicName); - } - - @Override - public UebTopicSink getUebTopicSink(String topicName) { - return UebTopicSink.factory.get(topicName); - } - - @Override - public DmaapTopicSource getDmaapTopicSource(String topicName) { - return DmaapTopicSource.factory.get(topicName); - } - - @Override - public DmaapTopicSink getDmaapTopicSink(String topicName) { - return DmaapTopicSink.factory.get(topicName); - } - - @Override - public NoopTopicSink getNoopTopicSink(String topicName) { - return NoopTopicSink.factory.get(topicName); - } - + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class); + /** + * Is this element locked? + */ + protected volatile boolean locked = false; + + /** + * Is this element alive? + */ + protected volatile boolean alive = false; + + @Override + public List<TopicSource> addTopicSources(Properties properties) { + + // 1. Create UEB Sources + // 2. Create DMAAP Sources + + final List<TopicSource> sources = new ArrayList<>(); + + sources.addAll(UebTopicSource.factory.build(properties)); + sources.addAll(DmaapTopicSource.factory.build(properties)); + + if (this.isLocked()) { + for (final TopicSource source : sources) { + source.lock(); + } + } + + return sources; + } + + @Override + public List<TopicSink> addTopicSinks(Properties properties) { + // 1. Create UEB Sinks + // 2. Create DMAAP Sinks + + final List<TopicSink> sinks = new ArrayList<>(); + + sinks.addAll(UebTopicSink.factory.build(properties)); + sinks.addAll(DmaapTopicSink.factory.build(properties)); + sinks.addAll(NoopTopicSink.factory.build(properties)); + + if (this.isLocked()) { + for (final TopicSink sink : sinks) { + sink.lock(); + } + } + + return sinks; + } + + @Override + public List<TopicSource> getTopicSources() { + + final List<TopicSource> sources = new ArrayList<>(); + + sources.addAll(UebTopicSource.factory.inventory()); + sources.addAll(DmaapTopicSource.factory.inventory()); + + return sources; + } + + @Override + public List<TopicSink> getTopicSinks() { + + final List<TopicSink> sinks = new ArrayList<>(); + + sinks.addAll(UebTopicSink.factory.inventory()); + sinks.addAll(DmaapTopicSink.factory.inventory()); + sinks.addAll(NoopTopicSink.factory.inventory()); + + return sinks; + } + + @JsonIgnore + @Override + public List<UebTopicSource> getUebTopicSources() { + return UebTopicSource.factory.inventory(); + } + + @JsonIgnore + @Override + public List<DmaapTopicSource> getDmaapTopicSources() { + return DmaapTopicSource.factory.inventory(); + } + + @JsonIgnore + @Override + public List<UebTopicSink> getUebTopicSinks() { + return UebTopicSink.factory.inventory(); + } + + @JsonIgnore + @Override + public List<DmaapTopicSink> getDmaapTopicSinks() { + return DmaapTopicSink.factory.inventory(); + } + + @JsonIgnore + @Override + public List<NoopTopicSink> getNoopTopicSinks() { + return NoopTopicSink.factory.inventory(); + } + + @Override + public boolean start() { + + synchronized (this) { + if (this.locked) { + throw new IllegalStateException(this + " is locked"); + } + + if (this.alive) { + return true; + } + + this.alive = true; + } + + final List<Startable> endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.start() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem starting endpoint: {}", endpoint, e); + } + } + + return success; + } + + + @Override + public boolean stop() { + + /* + * stop regardless if it is locked, in other words, stop operation has precedence over locks. + */ + synchronized (this) { + this.alive = false; + } + + final List<Startable> endpoints = this.getEndpoints(); + + boolean success = true; + for (final Startable endpoint : endpoints) { + try { + success = endpoint.stop() && success; + } catch (final Exception e) { + success = false; + logger.error("Problem stopping endpoint: {}", endpoint, e); + } + } + + return success; + } + + /** + * + * @return list of managed endpoints + */ + @JsonIgnore + protected List<Startable> getEndpoints() { + final List<Startable> endpoints = new ArrayList<>(); + + endpoints.addAll(this.getTopicSources()); + endpoints.addAll(this.getTopicSinks()); + + return endpoints; + } + + @Override + public void shutdown() { + UebTopicSource.factory.destroy(); + UebTopicSink.factory.destroy(); + NoopTopicSink.factory.destroy(); + + DmaapTopicSource.factory.destroy(); + DmaapTopicSink.factory.destroy(); + } + + @Override + public boolean isAlive() { + return this.alive; + } + + @Override + public boolean lock() { + + synchronized (this) { + if (this.locked) + return true; + + this.locked = true; + } + + for (final TopicSource source : this.getTopicSources()) { + source.lock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.lock(); + } + + return true; + } + + @Override + public boolean unlock() { + synchronized (this) { + if (!this.locked) + return true; + + this.locked = false; + } + + for (final TopicSource source : this.getTopicSources()) { + source.unlock(); + } + + for (final TopicSink sink : this.getTopicSinks()) { + sink.unlock(); + } + + return true; + } + + @Override + public boolean isLocked() { + return this.locked; + } + + @Override + public List<TopicSource> getTopicSources(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSource> sources = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSource uebSource = this.getUebTopicSource(topic); + if (uebSource != null) + sources.add(uebSource); + } catch (final Exception e) { + logger.debug("No UEB source for topic: {}", topic, e); + } + + try { + final TopicSource dmaapSource = this.getDmaapTopicSource(topic); + if (dmaapSource != null) + sources.add(dmaapSource); + } catch (final Exception e) { + logger.debug("No DMAAP source for topic: {}", topic, e); + } + } + return sources; + } + + @Override + public List<TopicSink> getTopicSinks(List<String> topicNames) { + + if (topicNames == null) { + throw new IllegalArgumentException("must provide a list of topics"); + } + + final List<TopicSink> sinks = new ArrayList<>(); + for (final String topic : topicNames) { + try { + final TopicSink uebSink = this.getUebTopicSink(topic); + if (uebSink != null) + sinks.add(uebSink); + } catch (final Exception e) { + logger.debug("No UEB sink for topic: {}", topic, e); + } + + try { + final TopicSink dmaapSink = this.getDmaapTopicSink(topic); + if (dmaapSink != null) + sinks.add(dmaapSink); + } catch (final Exception e) { + logger.debug("No DMAAP sink for topic: {}", topic, e); + } + + try { + final TopicSink noopSink = this.getNoopTopicSink(topic); + if (noopSink != null) + sinks.add(noopSink); + } catch (final Exception e) { + logger.debug("No NOOP sink for topic: {}", topic, e); + } + } + return sinks; + } + + @Override + public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName) + throws UnsupportedOperationException { + + if (commType == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + if (topicName == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + switch (commType) { + case UEB: + return this.getUebTopicSource(topicName); + case DMAAP: + return this.getDmaapTopicSource(topicName); + case REST: + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + @Override + public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName) + throws UnsupportedOperationException { + if (commType == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + if (topicName == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + switch (commType) { + case UEB: + return this.getUebTopicSink(topicName); + case DMAAP: + return this.getDmaapTopicSink(topicName); + case NOOP: + return this.getNoopTopicSink(topicName); + case REST: + default: + throw new UnsupportedOperationException("Unsupported " + commType.name()); + } + } + + @Override + public List<TopicSink> getTopicSinks(String topicName) { + if (topicName == null) { + throw new IllegalArgumentException( + "Invalid parameter: a communication infrastructure required to fetch " + topicName); + } + + final List<TopicSink> sinks = new ArrayList<>(); + + try { + sinks.add(this.getUebTopicSink(topicName)); + } catch (final Exception e) { + logger.debug("No sink for topic: {}", topicName, e); + } + + try { + sinks.add(this.getDmaapTopicSink(topicName)); + } catch (final Exception e) { + logger.debug("No sink for topic: {}", topicName, e); + } + + try { + sinks.add(this.getNoopTopicSink(topicName)); + } catch (final Exception e) { + logger.debug("No sink for topic: {}", topicName, e); + } + + return sinks; + } + + @Override + public UebTopicSource getUebTopicSource(String topicName) { + return UebTopicSource.factory.get(topicName); + } + + @Override + public UebTopicSink getUebTopicSink(String topicName) { + return UebTopicSink.factory.get(topicName); + } + + @Override + public DmaapTopicSource getDmaapTopicSource(String topicName) { + return DmaapTopicSource.factory.get(topicName); + } + + @Override + public DmaapTopicSink getDmaapTopicSink(String topicName) { + return DmaapTopicSink.factory.get(topicName); + } + + @Override + public NoopTopicSink getNoopTopicSink(String topicName) { + return NoopTopicSink.factory.get(topicName); + } + } diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java index 946e48c0..53315391 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,191 +33,194 @@ import org.slf4j.LoggerFactory; * Noop Topic Sink Factory */ public interface NoopTopicSinkFactory { - - /** - * Creates noop topic sinks based on properties files - * - * @param properties Properties containing initialization values - * - * @return a noop topic sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public List<NoopTopicSink> build(Properties properties); - - /** - * builds a noop sink - * - * @param servers list of servers - * @param topic topic name - * @param managed is this sink endpoint managed? - * @return a noop topic sink - * @throws IllegalArgumentException if invalid parameters are present - */ - public NoopTopicSink build(List<String> servers, String topic, boolean managed); - - /** - * Destroys a sink based on the topic - * - * @param topic topic name - * @throws IllegalArgumentException if invalid parameters are present - */ - public void destroy(String topic); - - /** - * gets a sink based on topic name - * @param topic the topic name - * - * @return a sink with topic name - * @throws IllegalArgumentException if an invalid topic is provided - * @throws IllegalStateException if the sink is in an incorrect state - */ - public NoopTopicSink get(String topic); - - /** - * Provides a snapshot of the UEB Topic Writers - * @return a list of the UEB Topic Writers - */ - public List<NoopTopicSink> inventory(); - - /** - * Destroys all sinks - */ - public void destroy(); + + /** + * Creates noop topic sinks based on properties files + * + * @param properties Properties containing initialization values + * + * @return a noop topic sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public List<NoopTopicSink> build(Properties properties); + + /** + * builds a noop sink + * + * @param servers list of servers + * @param topic topic name + * @param managed is this sink endpoint managed? + * @return a noop topic sink + * @throws IllegalArgumentException if invalid parameters are present + */ + public NoopTopicSink build(List<String> servers, String topic, boolean managed); + + /** + * Destroys a sink based on the topic + * + * @param topic topic name + * @throws IllegalArgumentException if invalid parameters are present + */ + public void destroy(String topic); + + /** + * gets a sink based on topic name + * + * @param topic the topic name + * + * @return a sink with topic name + * @throws IllegalArgumentException if an invalid topic is provided + * @throws IllegalStateException if the sink is in an incorrect state + */ + public NoopTopicSink get(String topic); + + /** + * Provides a snapshot of the UEB Topic Writers + * + * @return a list of the UEB Topic Writers + */ + public List<NoopTopicSink> inventory(); + + /** + * Destroys all sinks + */ + public void destroy(); } + /* ------------- implementation ----------------- */ /** * Factory of noop sinks */ class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory { - /** - * Logger - */ - private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); - - /** - * noop topic sinks map - */ - protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>(); - - @Override - public List<NoopTopicSink> build(Properties properties) { - - String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS); - if (sinkTopics == null || sinkTopics.isEmpty()) { - logger.info("{}: no topic for noop sink", this); - return new ArrayList<>(); - } - - List<String> sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); - List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>(); - synchronized(this) { - for (String topic: sinkTopicList) { - if (this.noopTopicSinks.containsKey(topic)) { - newSinks.add(this.noopTopicSinks.get(topic)); - continue; - } - - String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + - topic + - PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); - - if (servers == null || servers.isEmpty()) - servers = "noop"; - - List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); - - String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic + - PolicyProperties.PROPERTY_MANAGED_SUFFIX); - boolean managed = true; - if (managedString != null && !managedString.isEmpty()) { - managed = Boolean.parseBoolean(managedString); - } - - NoopTopicSink noopSink = this.build(serverList, topic, managed); - newSinks.add(noopSink); - } - return newSinks; - } - } - - @Override - public NoopTopicSink build(List<String> servers, String topic, boolean managed) { - if (servers == null) { - servers = new ArrayList<>(); - } - - if (servers.isEmpty()) { - servers.add("noop"); - } - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("A topic must be provided"); - } - - synchronized (this) { - if (noopTopicSinks.containsKey(topic)) { - return noopTopicSinks.get(topic); - } - - NoopTopicSink sink = - new NoopTopicSink(servers, topic); - - if (managed) - noopTopicSinks.put(topic, sink); - - return sink; - } - } - - @Override - public void destroy(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("A topic must be provided"); - } - - NoopTopicSink noopSink; - synchronized(this) { - if (!noopTopicSinks.containsKey(topic)) { - return; - } - - noopSink = noopTopicSinks.remove(topic); - } - - noopSink.shutdown(); - } - - @Override - public void destroy() { - List<NoopTopicSink> sinks = this.inventory(); - for (NoopTopicSink sink: sinks) { - sink.shutdown(); - } - - synchronized(this) { - this.noopTopicSinks.clear(); - } - } - - @Override - public NoopTopicSink get(String topic) { - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("A topic must be provided"); - } - - synchronized(this) { - if (noopTopicSinks.containsKey(topic)) { - return noopTopicSinks.get(topic); - } else { - throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); - } - } - } - - @Override - public List<NoopTopicSink> inventory() { - return new ArrayList<>(this.noopTopicSinks.values()); - } + /** + * Logger + */ + private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class); + + /** + * noop topic sinks map + */ + protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>(); + + @Override + public List<NoopTopicSink> build(Properties properties) { + + final String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS); + if (sinkTopics == null || sinkTopics.isEmpty()) { + logger.info("{}: no topic for noop sink", this); + return new ArrayList<>(); + } + + final List<String> sinkTopicList = + new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*"))); + final List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>(); + synchronized (this) { + for (final String topic : sinkTopicList) { + if (this.noopTopicSinks.containsKey(topic)) { + newSinks.add(this.noopTopicSinks.get(topic)); + continue; + } + + String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + + topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX); + + if (servers == null || servers.isEmpty()) + servers = "noop"; + + final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*"))); + + final String managedString = + properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic + + PolicyProperties.PROPERTY_MANAGED_SUFFIX); + boolean managed = true; + if (managedString != null && !managedString.isEmpty()) { + managed = Boolean.parseBoolean(managedString); + } + + final NoopTopicSink noopSink = this.build(serverList, topic, managed); + newSinks.add(noopSink); + } + return newSinks; + } + } + + @Override + public NoopTopicSink build(List<String> servers, String topic, boolean managed) { + if (servers == null) { + servers = new ArrayList<>(); + } + + if (servers.isEmpty()) { + servers.add("noop"); + } + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized (this) { + if (this.noopTopicSinks.containsKey(topic)) { + return this.noopTopicSinks.get(topic); + } + + final NoopTopicSink sink = new NoopTopicSink(servers, topic); + + if (managed) + this.noopTopicSinks.put(topic, sink); + + return sink; + } + } + + @Override + public void destroy(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + NoopTopicSink noopSink; + synchronized (this) { + if (!this.noopTopicSinks.containsKey(topic)) { + return; + } + + noopSink = this.noopTopicSinks.remove(topic); + } + + noopSink.shutdown(); + } + + @Override + public void destroy() { + final List<NoopTopicSink> sinks = this.inventory(); + for (final NoopTopicSink sink : sinks) { + sink.shutdown(); + } + + synchronized (this) { + this.noopTopicSinks.clear(); + } + } + + @Override + public NoopTopicSink get(String topic) { + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("A topic must be provided"); + } + + synchronized (this) { + if (this.noopTopicSinks.containsKey(topic)) { + return this.noopTopicSinks.get(topic); + } else { + throw new IllegalStateException("DmaapTopicSink for " + topic + " not found"); + } + } + } + + @Override + public List<NoopTopicSink> inventory() { + return new ArrayList<>(this.noopTopicSinks.values()); + } } diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java index edb03bba..8171c35d 100644 --- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java +++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -34,427 +34,432 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; import com.att.nsa.cambria.client.CambriaConsumer; import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.client.impl.MRConsumerImpl; import com.att.nsa.mr.client.response.MRConsumerResponse; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; -import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder; /** * Wrapper around libraries to consume from message bus * */ public interface BusConsumer { - - /** - * fetch messages - * - * @return list of messages - * @throws Exception when error encountered by underlying libraries - */ - public Iterable<String> fetch() throws InterruptedException, IOException; - - /** - * close underlying library consumer - */ - public void close(); - - /** - * Cambria based consumer - */ - public static class CambriaConsumerWrapper implements BusConsumer { - - /** - * Cambria client - */ - protected CambriaConsumer consumer; - - /** - * Cambria Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws GeneralSecurityException - * @throws MalformedURLException - */ - public CambriaConsumerWrapper(List<String> servers, String topic, - String apiKey, String apiSecret, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) - throws IllegalArgumentException { - - ConsumerBuilder builder = - new CambriaClientBuilders.ConsumerBuilder(); - - - if (useHttps){ - - if(useSelfSignedCerts){ - builder.knownAs(consumerGroup, consumerInstance) - .usingHosts(servers) - .onTopic(topic) - .waitAtServer(fetchTimeout) - .receivingAtMost(fetchLimit) - .usingHttps() - .allowSelfSignedCertificates(); - } - else{ - builder.knownAs(consumerGroup, consumerInstance) - .usingHosts(servers) - .onTopic(topic) - .waitAtServer(fetchTimeout) - .receivingAtMost(fetchLimit) - .usingHttps(); - } - } - else{ - builder.knownAs(consumerGroup, consumerInstance) - .usingHosts(servers) - .onTopic(topic) - .waitAtServer(fetchTimeout) - .receivingAtMost(fetchLimit); - } - - if (apiKey != null && !apiKey.isEmpty() && - apiSecret != null && !apiSecret.isEmpty()) { - builder.authenticatedBy(apiKey, apiSecret); - } - - try { - this.consumer = builder.build(); - } catch (MalformedURLException | GeneralSecurityException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public Iterable<String> fetch() throws IOException { - return this.consumer.fetch(); - } - - @Override - public void close() { - this.consumer.close(); - } - - @Override - public String toString() { - return "CambriaConsumerWrapper []"; - } - } - - /** - * MR based consumer - */ - public abstract class DmaapConsumerWrapper implements BusConsumer { - - /** - * logger - */ - private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); - - /** - * fetch timeout - */ - protected int fetchTimeout; - - /** - * close condition - */ - protected Object closeCondition = new Object(); - - /** - * MR Consumer - */ - protected MRConsumerImpl consumer; - - /** - * MR Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param username AAF Login - * @param password AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws MalformedURLException - */ - public DmaapConsumerWrapper(List<String> servers, String topic, - String apiKey, String apiSecret, - String username, String password, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps) - throws MalformedURLException { - - this.fetchTimeout = fetchTimeout; - - if (topic == null || topic.isEmpty()) { - throw new IllegalArgumentException("No topic for DMaaP"); - } - - this.consumer = new MRConsumerImpl(servers, topic, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, - null, apiKey, apiSecret); - - this.consumer.setUsername(username); - this.consumer.setPassword(password); - } - - @Override - public Iterable<String> fetch() throws InterruptedException, IOException { - MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); - if (response == null) { - logger.warn("{}: DMaaP NULL response received", this); - - synchronized (closeCondition) { - closeCondition.wait(fetchTimeout); - } - return new ArrayList<>(); - } else { - logger.debug("DMaaP consumer received {} : {}" + - response.getResponseCode(), - response.getResponseMessage()); - - if (response.getResponseCode() == null || - !response.getResponseCode().equals("200")) { - - logger.error("DMaaP consumer received: {} : {}", - response.getResponseCode(), - response.getResponseMessage()); - - synchronized (closeCondition) { - closeCondition.wait(fetchTimeout); - } - - /* fall through */ - } - } - - if (response.getActualMessages() == null) - return new ArrayList<>(); - else - return response.getActualMessages(); - } - - @Override - public void close() { - synchronized (closeCondition) { - closeCondition.notifyAll(); - } - - this.consumer.close(); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder. - append("DmaapConsumerWrapper ["). - append("consumer.getAuthDate()=").append(consumer.getAuthDate()). - append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). - append(", consumer.getHost()=").append(consumer.getHost()). - append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). - append(", consumer.getUsername()=").append(consumer.getUsername()). - append("]"); - return builder.toString(); - } - } - - /** - * MR based consumer - */ - public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); - - private Properties props; - - /** - * MR Consumer Wrapper - * - * @param servers messaging bus hosts - * @param topic topic - * @param apiKey API Key - * @param apiSecret API Secret - * @param aafLogin AAF Login - * @param aafPassword AAF Password - * @param consumerGroup Consumer Group - * @param consumerInstance Consumer Instance - * @param fetchTimeout Fetch Timeout - * @param fetchLimit Fetch Limit - * @throws MalformedURLException - */ - public DmaapAafConsumerWrapper(List<String> servers, String topic, - String apiKey, String apiSecret, - String aafLogin, String aafPassword, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException { - - super(servers, topic, apiKey, apiSecret, - aafLogin, aafPassword, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, useHttps); - - // super constructor sets servers = {""} if empty to avoid errors when using DME2 - if ((servers.size() == 1 && servers.get(0).equals("")) || - (servers == null) || (servers.isEmpty())) { - throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); - } - - this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); - - props = new Properties(); - - if(useHttps){ - props.setProperty("Protocol", "https"); - this.consumer.setHost(servers.get(0) + ":3905"); - - } - else{ - props.setProperty("Protocol", "http"); - this.consumer.setHost(servers.get(0) + ":3904"); - } - - this.consumer.setProps(props); - logger.info("{}: CREATION", this); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - MRConsumerImpl consumer = (MRConsumerImpl) this.consumer; - - builder. - append("DmaapConsumerWrapper ["). - append("consumer.getAuthDate()=").append(consumer.getAuthDate()). - append(", consumer.getAuthKey()=").append(consumer.getAuthKey()). - append(", consumer.getHost()=").append(consumer.getHost()). - append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()). - append(", consumer.getUsername()=").append(consumer.getUsername()). - append("]"); - return builder.toString(); - } - } - - public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { - - private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); - - private Properties props; - - public DmaapDmeConsumerWrapper(List<String> servers, String topic, - String apiKey, String apiSecret, - String dme2Login, String dme2Password, - String consumerGroup, String consumerInstance, - int fetchTimeout, int fetchLimit, - String environment, String aftEnvironment, String dme2Partner, - String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) throws MalformedURLException { - - - - super(servers, topic, apiKey, apiSecret, - dme2Login, dme2Password, - consumerGroup, consumerInstance, - fetchTimeout, fetchLimit, useHttps); - - - String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); - - if (environment == null || environment.isEmpty()) { - throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); - } if (aftEnvironment == null || aftEnvironment.isEmpty()) { - throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP"); - } if (latitude == null || latitude.isEmpty()) { - throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP"); - } if (longitude == null || longitude.isEmpty()) { - throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP"); - } - - if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { - throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + - "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + - PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); - } - - String serviceName = servers.get(0); - - this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); - - this.consumer.setUsername(dme2Login); - this.consumer.setPassword(dme2Password); - - props = new Properties(); - - props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); - - props.setProperty("username", dme2Login); - props.setProperty("password", dme2Password); - - /* These are required, no defaults */ - props.setProperty("topic", topic); - - props.setProperty("Environment", environment); - props.setProperty("AFT_ENVIRONMENT", aftEnvironment); - - if (dme2Partner != null) - props.setProperty("Partner", dme2Partner); - if (dme2RouteOffer != null) - props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); - - props.setProperty("Latitude", latitude); - props.setProperty("Longitude", longitude); - - /* These are optional, will default to these values if not set in additionalProps */ - props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); - props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); - props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); - props.setProperty("Version", "1.0"); - props.setProperty("SubContextPath", "/"); - props.setProperty("sessionstickinessrequired", "no"); - - /* These should not change */ - props.setProperty("TransportType", "DME2"); - props.setProperty("MethodType", "GET"); - - if(useHttps){ - props.setProperty("Protocol", "https"); - - } - else{ - props.setProperty("Protocol", "http"); - } - - props.setProperty("contenttype", "application/json"); - - if (additionalProps != null) { - for(String key : additionalProps.keySet()) - props.put(key, additionalProps.get(key)); - } - - MRClientFactory.prop = props; - this.consumer.setProps(props); - - logger.info("{}: CREATION", this); - } - } -} + /** + * fetch messages + * + * @return list of messages + * @throws Exception when error encountered by underlying libraries + */ + public Iterable<String> fetch() throws InterruptedException, IOException; + + /** + * close underlying library consumer + */ + public void close(); + + /** + * Cambria based consumer + */ + public static class CambriaConsumerWrapper implements BusConsumer { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class); + + /** + * Cambria client + */ + protected CambriaConsumer consumer; + + /** + * fetch timeout + */ + protected int fetchTimeout; + + /** + * close condition + */ + protected Object closeCondition = new Object(); + + /** + * Cambria Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws GeneralSecurityException + * @throws MalformedURLException + */ + public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey, + String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout, + int fetchLimit, boolean useHttps, boolean useSelfSignedCerts) + throws IllegalArgumentException { + + this.fetchTimeout = fetchTimeout; + + final ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder(); + + if (useHttps) { + + if (useSelfSignedCerts) { + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps() + .allowSelfSignedCertificates(); + } else { + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps(); + } + } else { + builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic) + .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit); + } + + if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) { + builder.authenticatedBy(apiKey, apiSecret); + } + + try { + this.consumer = builder.build(); + } catch (MalformedURLException | GeneralSecurityException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public Iterable<String> fetch() throws IOException, InterruptedException { + try { + return this.consumer.fetch(); + } catch (final IOException e) { + logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(), + this.fetchTimeout); + synchronized (this.closeCondition) { + this.closeCondition.wait(this.fetchTimeout); + } + + throw e; + } + } + + @Override + public void close() { + synchronized (closeCondition) { + closeCondition.notifyAll(); + } + + this.consumer.close(); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("CambriaConsumerWrapper [fetchTimeout=").append(fetchTimeout).append("]"); + return builder.toString(); + } + } + + /** + * MR based consumer + */ + public abstract class DmaapConsumerWrapper implements BusConsumer { + + /** + * logger + */ + private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class); + + /** + * fetch timeout + */ + protected int fetchTimeout; + + /** + * close condition + */ + protected Object closeCondition = new Object(); + + /** + * MR Consumer + */ + protected MRConsumerImpl consumer; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param username AAF Login + * @param password AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws MalformedURLException + */ + public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret, + String username, String password, String consumerGroup, String consumerInstance, + int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException { + + this.fetchTimeout = fetchTimeout; + + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException("No topic for DMaaP"); + } + + this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance, + fetchTimeout, fetchLimit, null, apiKey, apiSecret); + + this.consumer.setUsername(username); + this.consumer.setPassword(password); + } + + @Override + public Iterable<String> fetch() throws InterruptedException, IOException { + final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse(); + if (response == null) { + logger.warn("{}: DMaaP NULL response received", this); + + synchronized (closeCondition) { + closeCondition.wait(fetchTimeout); + } + return new ArrayList<>(); + } else { + logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(), + response.getResponseMessage()); + + if (response.getResponseCode() == null || !response.getResponseCode().equals("200")) { + + logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), + response.getResponseMessage()); + + synchronized (closeCondition) { + closeCondition.wait(fetchTimeout); + } + + /* fall through */ + } + } + + if (response.getActualMessages() == null) + return new ArrayList<>(); + else + return response.getActualMessages(); + } + + @Override + public void close() { + synchronized (closeCondition) { + closeCondition.notifyAll(); + } + + this.consumer.close(); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=") + .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=") + .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost()) + .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()) + .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]"); + return builder.toString(); + } + } + /** + * MR based consumer + */ + public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper { + + private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class); + + private final Properties props; + + /** + * MR Consumer Wrapper + * + * @param servers messaging bus hosts + * @param topic topic + * @param apiKey API Key + * @param apiSecret API Secret + * @param aafLogin AAF Login + * @param aafPassword AAF Password + * @param consumerGroup Consumer Group + * @param consumerInstance Consumer Instance + * @param fetchTimeout Fetch Timeout + * @param fetchLimit Fetch Limit + * @throws MalformedURLException + */ + public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey, + String apiSecret, String aafLogin, String aafPassword, String consumerGroup, + String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps) + throws MalformedURLException { + + super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup, + consumerInstance, fetchTimeout, fetchLimit, useHttps); + + // super constructor sets servers = {""} if empty to avoid errors when using DME2 + if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null) + || (servers.isEmpty())) { + throw new IllegalArgumentException("Must provide at least one host for HTTP AAF"); + } + + this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue()); + + props = new Properties(); + + if (useHttps) { + props.setProperty("Protocol", "https"); + this.consumer.setHost(servers.get(0) + ":3905"); + + } else { + props.setProperty("Protocol", "http"); + this.consumer.setHost(servers.get(0) + ":3904"); + } + + this.consumer.setProps(props); + logger.info("{}: CREATION", this); + } + + @Override + public String toString() { + final StringBuilder builder = new StringBuilder(); + final MRConsumerImpl consumer = this.consumer; + + builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=") + .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=") + .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost()) + .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()) + .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]"); + return builder.toString(); + } + } + + public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper { + + private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class); + + private final Properties props; + + public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey, + String apiSecret, String dme2Login, String dme2Password, String consumerGroup, + String consumerInstance, int fetchTimeout, int fetchLimit, String environment, + String aftEnvironment, String dme2Partner, String latitude, String longitude, + Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException { + + + + super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup, + consumerInstance, fetchTimeout, fetchLimit, useHttps); + + + final String dme2RouteOffer = + additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY); + + if (environment == null || environment.isEmpty()) { + throw new IllegalArgumentException( + "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + + " property for DME2 in DMaaP"); + } + if (aftEnvironment == null || aftEnvironment.isEmpty()) { + throw new IllegalArgumentException( + "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + + " property for DME2 in DMaaP"); + } + if (latitude == null || latitude.isEmpty()) { + throw new IllegalArgumentException("Missing " + + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP"); + } + if (longitude == null || longitude.isEmpty()) { + throw new IllegalArgumentException( + "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + + " property for DME2 in DMaaP"); + } + + if ((dme2Partner == null || dme2Partner.isEmpty()) + && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) { + throw new IllegalArgumentException( + "Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " + + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2"); + } + + final String serviceName = servers.get(0); + + this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue()); + + this.consumer.setUsername(dme2Login); + this.consumer.setPassword(dme2Password); + + props = new Properties(); + + props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName); + + props.setProperty("username", dme2Login); + props.setProperty("password", dme2Password); + + /* These are required, no defaults */ + props.setProperty("topic", topic); + + props.setProperty("Environment", environment); + props.setProperty("AFT_ENVIRONMENT", aftEnvironment); + + if (dme2Partner != null) + props.setProperty("Partner", dme2Partner); + if (dme2RouteOffer != null) + props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer); + + props.setProperty("Latitude", latitude); + props.setProperty("Longitude", longitude); + + /* These are optional, will default to these values if not set in additionalProps */ + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000"); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000"); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000"); + props.setProperty("Version", "1.0"); + props.setProperty("SubContextPath", "/"); + props.setProperty("sessionstickinessrequired", "no"); + + /* These should not change */ + props.setProperty("TransportType", "DME2"); + props.setProperty("MethodType", "GET"); + + if (useHttps) { + props.setProperty("Protocol", "https"); + + } else { + props.setProperty("Protocol", "http"); + } + + props.setProperty("contenttype", "application/json"); + + if (additionalProps != null) { + for (final String key : additionalProps.keySet()) + props.put(key, additionalProps.get(key)); + } + + MRClientFactory.prop = props; + this.consumer.setProps(props); + + logger.info("{}: CREATION", this); + } + } +} diff --git a/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java index f7ef7bcf..dd9a7c2b 100644 --- a/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java +++ b/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,7 @@ package org.onap.policy.drools.http.server.test; import static org.junit.Assert.assertTrue; +import java.io.IOException; import java.util.ArrayList; import java.util.Properties; @@ -33,186 +34,170 @@ import org.junit.Test; import org.onap.policy.drools.http.client.HttpClient; import org.onap.policy.drools.http.server.HttpServletServer; import org.onap.policy.drools.properties.PolicyProperties; +import org.onap.policy.drools.utils.NetworkUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class HttpClientTest { - - private static Logger logger = LoggerFactory.getLogger(HttpClientTest.class); - - @BeforeClass - public static void setUp() throws InterruptedException { - logger.info("-- setup() --"); - - /* echo server */ - - HttpServletServer echoServerNoAuth = HttpServletServer.factory.build("echo", "localhost", 6666, "/", false, true); - echoServerNoAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName()); - echoServerNoAuth.waitedStart(5000); - - /* no auth echo server */ - - HttpServletServer echoServerAuth = HttpServletServer.factory.build("echo", "localhost", 6667, "/", false, true); - echoServerAuth.setBasicAuthentication("x", "y", null); - echoServerAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName()); - echoServerAuth.waitedStart(5000); - } - - @AfterClass - public static void tearDown() { - logger.info("-- tearDown() --"); - - HttpServletServer.factory.destroy(); - HttpClient.factory.destroy(); - } - - @Test - public void testHttpNoAuthClient() throws Exception { - logger.info("-- testHttpNoAuthClient() --"); - - HttpClient client = HttpClient.factory.build("testHttpNoAuthClient", false, false, - "localhost", 6666, "junit/echo", - null, null, true); - Response response = client.get("hello"); - String body = HttpClient.getBody(response, String.class); - - assertTrue(response.getStatus() == 200); - assertTrue(body.equals("hello")); - } - - @Test - public void testHttpAuthClient() throws Exception { - logger.info("-- testHttpAuthClient() --"); - - HttpClient client = HttpClient.factory.build("testHttpAuthClient",false, false, - "localhost", 6667, "junit/echo", - "x", "y", true); - Response response = client.get("hello"); - String body = HttpClient.getBody(response, String.class); - - assertTrue(response.getStatus() == 200); - assertTrue(body.equals("hello")); - } - - @Test - public void testHttpAuthClient401() throws Exception { - logger.info("-- testHttpAuthClient401() --"); - - HttpClient client = HttpClient.factory.build("testHttpAuthClient401",false, false, - "localhost", 6667, "junit/echo", - null, null, true); - Response response = client.get("hello"); - assertTrue(response.getStatus() == 401); - } - - @Test - public void testHttpAuthClientProps() throws Exception { - logger.info("-- testHttpAuthClientProps() --"); - - Properties httpProperties = new Properties(); - - httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES, "PAP,PDP"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, - "localhost"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, - "7777"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, - "testpap"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, - "alpha123"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, - RestMockHealthCheck.class.getName()); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX, - "true"); - - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, - "localhost"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, - "7778"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, - "testpdp"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, - "alpha123"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, - RestMockHealthCheck.class.getName()); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX, - "true"); - - httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES, "PAP,PDP"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, - "localhost"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, - "7777"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, - "pap/test"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, - "false"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, - "testpap"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, - "alpha123"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX, - "true"); - - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, - "localhost"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, - "7778"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, - "pdp"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, - "false"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, - "testpdp"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, - "alpha123"); - httpProperties.setProperty - (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX, - "true"); - - ArrayList<HttpServletServer> servers = HttpServletServer.factory.build(httpProperties); - assertTrue(servers.size() == 2); - - ArrayList<HttpClient> clients = HttpClient.factory.build(httpProperties); - assertTrue(clients.size() == 2); - - for (HttpServletServer server: servers) { - server.waitedStart(5000); - } - - HttpClient clientPAP = HttpClient.factory.get("PAP"); - Response response = clientPAP.get(); - assertTrue(response.getStatus() == 200); - - HttpClient clientPDP = HttpClient.factory.get("PDP"); - Response response2 = clientPDP.get("test"); - assertTrue(response2.getStatus() == 500); + + private static Logger logger = LoggerFactory.getLogger(HttpClientTest.class); + + @BeforeClass + public static void setUp() throws InterruptedException, IOException { + logger.info("-- setup() --"); + + /* echo server */ + + final HttpServletServer echoServerNoAuth = + HttpServletServer.factory.build("echo", "localhost", 6666, "/", false, true); + echoServerNoAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName()); + echoServerNoAuth.waitedStart(5000); + + if (!NetworkUtil.isTcpPortOpen("localhost", echoServerNoAuth.getPort(), 5, 10000L)) + throw new IllegalStateException("cannot connect to port " + echoServerNoAuth.getPort()); + + /* no auth echo server */ + + final HttpServletServer echoServerAuth = + HttpServletServer.factory.build("echo", "localhost", 6667, "/", false, true); + echoServerAuth.setBasicAuthentication("x", "y", null); + echoServerAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName()); + echoServerAuth.waitedStart(5000); + + if (!NetworkUtil.isTcpPortOpen("localhost", echoServerAuth.getPort(), 5, 10000L)) + throw new IllegalStateException("cannot connect to port " + echoServerAuth.getPort()); + } + + @AfterClass + public static void tearDown() { + logger.info("-- tearDown() --"); + + HttpServletServer.factory.destroy(); + HttpClient.factory.destroy(); + } + + @Test + public void testHttpNoAuthClient() throws Exception { + logger.info("-- testHttpNoAuthClient() --"); + + final HttpClient client = HttpClient.factory.build("testHttpNoAuthClient", false, false, + "localhost", 6666, "junit/echo", null, null, true); + final Response response = client.get("hello"); + final String body = HttpClient.getBody(response, String.class); + + assertTrue(response.getStatus() == 200); + assertTrue(body.equals("hello")); + } + + @Test + public void testHttpAuthClient() throws Exception { + logger.info("-- testHttpAuthClient() --"); + + final HttpClient client = HttpClient.factory.build("testHttpAuthClient", false, false, + "localhost", 6667, "junit/echo", "x", "y", true); + final Response response = client.get("hello"); + final String body = HttpClient.getBody(response, String.class); + + assertTrue(response.getStatus() == 200); + assertTrue(body.equals("hello")); + } + + @Test + public void testHttpAuthClient401() throws Exception { + logger.info("-- testHttpAuthClient401() --"); + + final HttpClient client = HttpClient.factory.build("testHttpAuthClient401", false, false, + "localhost", 6667, "junit/echo", null, null, true); + final Response response = client.get("hello"); + assertTrue(response.getStatus() == 401); + } + + @Test + public void testHttpAuthClientProps() throws Exception { + logger.info("-- testHttpAuthClientProps() --"); + + final Properties httpProperties = new Properties(); + + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES, "PAP,PDP"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7777"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpap"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123"); + httpProperties.setProperty( + PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, + RestMockHealthCheck.class.getName()); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true"); + + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7778"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpdp"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123"); + httpProperties.setProperty( + PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, + RestMockHealthCheck.class.getName()); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true"); + + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES, "PAP,PDP"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7777"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, "pap/test"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, "false"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpap"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true"); + + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7778"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, "pdp"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, "false"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpdp"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123"); + httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true"); + + final ArrayList<HttpServletServer> servers = HttpServletServer.factory.build(httpProperties); + assertTrue(servers.size() == 2); + + final ArrayList<HttpClient> clients = HttpClient.factory.build(httpProperties); + assertTrue(clients.size() == 2); + + for (final HttpServletServer server : servers) { + server.waitedStart(10000); } + final HttpClient clientPAP = HttpClient.factory.get("PAP"); + final Response response = clientPAP.get(); + assertTrue(response.getStatus() == 200); + + final HttpClient clientPDP = HttpClient.factory.get("PDP"); + final Response response2 = clientPDP.get("test"); + assertTrue(response2.getStatus() == 500); + } + } diff --git a/policy-management/src/main/java/org/onap/policy/drools/persistence/FileSystemPersistence.java b/policy-management/src/main/java/org/onap/policy/drools/persistence/FileSystemPersistence.java index 5ac17253..e217ee7d 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/persistence/FileSystemPersistence.java +++ b/policy-management/src/main/java/org/onap/policy/drools/persistence/FileSystemPersistence.java @@ -164,7 +164,7 @@ public class FileSystemPersistence implements SystemPersistence { this.backupController(controllerName); } } catch (final Exception e) { - logger.info("{}: no existing {} properties", this, controllerName); + logger.info("{}: no existing {} properties {}", this, controllerName, e); // continue } } diff --git a/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java b/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java index ffcb35cd..48eedfa5 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java +++ b/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java @@ -49,6 +49,7 @@ import org.onap.policy.drools.event.comm.TopicSink; import org.onap.policy.drools.event.comm.TopicSource; import org.onap.policy.drools.event.comm.bus.DmaapTopicSink; import org.onap.policy.drools.event.comm.bus.DmaapTopicSource; +import org.onap.policy.drools.event.comm.bus.NoopTopicSink; import org.onap.policy.drools.event.comm.bus.UebTopicSink; import org.onap.policy.drools.event.comm.bus.UebTopicSource; import org.onap.policy.drools.features.PolicyControllerFeatureAPI; @@ -1645,7 +1646,7 @@ public class RestManager { @Path("engine/topics/sinks/ueb") @ApiOperation(value = "Retrieves the UEB managed topic sinks", notes = "UEB Topic Sinks Agregation", responseContainer = "List", - response = UebTopicSource.class) + response = UebTopicSink.class) public Response uebSinks() { return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getUebTopicSinks()) .build(); @@ -1763,6 +1764,36 @@ public class RestManager { } @GET + @Path("engine/topics/sinks/noop") + @ApiOperation(value = "Retrieves the NOOP managed topic sinks", + notes = "NOOP Topic Sinks Agregation", responseContainer = "List", + response = NoopTopicSink.class) + public Response noopSinks() { + return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getNoopTopicSinks()) + .build(); + } + + @GET + @Path("engine/topics/sinks/noop/{topic}") + @ApiOperation(value = "Retrieves a NOOP managed topic sink", + notes = "NOOP is an dev/null Network Communicaton Sink", response = NoopTopicSink.class) + public Response noopSinkTopic( + @ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) { + return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getNoopTopicSink(topic)) + .build(); + } + + @GET + @Path("engine/topics/sinks/noop/{topic}/events") + @ApiOperation(value = "Retrieves the latest events send through a NOOP topic", + notes = "NOOP is an dev/null Network Communicaton Sink", responseContainer = "List") + public Response noopSinkEvents(@PathParam("topic") String topic) { + return Response.status(Status.OK) + .entity(Arrays.asList(TopicEndpoint.manager.getNoopTopicSink(topic).getRecentEvents())) + .build(); + } + + @GET @Path("engine/topics/sources/ueb/{topic}/switches") @ApiOperation(value = "UEB Topic Control Switches", notes = "List of the UEB Topic Control Switches", responseContainer = "List") diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java index a262352b..b2b2df6d 100644 --- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java +++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java @@ -760,7 +760,7 @@ class PolicyEngineManager implements PolicyEngine { for (final HttpServletServer httpServer : this.httpServers) { try { - if (!httpServer.waitedStart(5 * 1000L)) + if (!httpServer.waitedStart(10 * 1000L)) success = false; } catch (final Exception e) { logger.error("{}: cannot start http-server {} because of {}", this, httpServer, @@ -998,6 +998,7 @@ class PolicyEngineManager implements PolicyEngine { Thread.sleep(5000L); } catch (final InterruptedException e) { logger.warn("{}: interrupted-exception while shutting down management server: ", this); + Thread.currentThread().interrupt(); } System.exit(0); diff --git a/policy-utils/src/main/java/org/onap/policy/drools/utils/NetworkUtil.java b/policy-utils/src/main/java/org/onap/policy/drools/utils/NetworkUtil.java index f6c837f5..bd5b8aac 100644 --- a/policy-utils/src/main/java/org/onap/policy/drools/utils/NetworkUtil.java +++ b/policy-utils/src/main/java/org/onap/policy/drools/utils/NetworkUtil.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,16 +20,51 @@ package org.onap.policy.drools.utils; +import java.io.IOException; +import java.net.ConnectException; +import java.net.Socket; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Network Utilities */ public class NetworkUtil { - - /** - * IPv4 Wildcard IP address - */ - public static final String IPv4_WILDCARD_ADDRESS = "0.0.0.0"; - - /* Other methods will be added as needed */ + + public static final Logger logger = LoggerFactory.getLogger(NetworkUtil.class.getName()); + + /** + * IPv4 Wildcard IP address + */ + public static final String IPv4_WILDCARD_ADDRESS = "0.0.0.0"; + + + /** + * try to connect to $host:$port $retries times while we are getting connection failures. + * + * @param host host + * @param port port + * @param retries number of attempts + * @return true is port is open, false otherwise + * @throws InterruptedException if execution has been interrupted + */ + public static boolean isTcpPortOpen(String host, int port, int retries, long interval) + throws InterruptedException, IOException { + int retry = 0; + while (retry < retries) { + try (Socket s = new Socket(host, port)) { + logger.debug("{}:{} connected - retries={} interval={}", host, port, retries, interval); + return true; + } catch (final ConnectException e) { + retry++; + logger.trace("{}:{} connected - retries={} interval={}", host, port, retries, interval, e); + Thread.sleep(interval); + } + } + + logger.warn("{}:{} closed = retries={} interval={}", host, port, retries, interval); + return false; + } } diff --git a/policy-utils/src/test/java/org/onap/policy/drools/utils/PairTripleTest.java b/policy-utils/src/test/java/org/onap/policy/drools/utils/PairTripleTest.java new file mode 100644 index 00000000..1af831ad --- /dev/null +++ b/policy-utils/src/test/java/org/onap/policy/drools/utils/PairTripleTest.java @@ -0,0 +1,65 @@ +/*- + * ============LICENSE_START======================================================= + * policy-utils + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.utils; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class PairTripleTest { + + @Test + public void pairTest() { + Pair<String, String> p = new Pair<String, String>("foo", "bar"); + + assertEquals(p.first(),"foo"); + assertEquals(p.second(),"bar"); + assertEquals(p.getFirst(),"foo"); + assertEquals(p.getSecond(),"bar"); + + p.first("one"); + p.second("two"); + + assertEquals(p.first(),"one"); + assertEquals(p.second(),"two"); + assertEquals(p.getFirst(),"one"); + assertEquals(p.getSecond(),"two"); + + } + + @Test + public void tripleTest() { + Triple<String, String, String> t = new Triple<String, String,String>("foo", "bar", "fiz"); + + assertEquals(t.first(),"foo"); + assertEquals(t.second(),"bar"); + assertEquals(t.third(),"fiz"); + + t.first("one"); + t.second("two"); + t.third("three"); + + assertEquals(t.first(),"one"); + assertEquals(t.second(),"two"); + assertEquals(t.third(),"three"); + } + +} diff --git a/policy-utils/src/test/java/org/onap/policy/drools/utils/ReflectionUtilTest.java b/policy-utils/src/test/java/org/onap/policy/drools/utils/ReflectionUtilTest.java new file mode 100644 index 00000000..ab4bace5 --- /dev/null +++ b/policy-utils/src/test/java/org/onap/policy/drools/utils/ReflectionUtilTest.java @@ -0,0 +1,62 @@ +/*- + * ============LICENSE_START======================================================= + * policy-utils + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.policy.drools.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import org.junit.Test; + +public class ReflectionUtilTest { + + public class ParentClass { + + } + + public class ChildClass extends ParentClass{ + + } + + @Test + public void reflectionTest() { + + try { + + Class<?> class1 = Class.forName("org.onap.policy.drools.utils.ReflectionUtil"); + + ClassLoader classLoader = class1.getClassLoader(); + + Class<?> class2 = ReflectionUtil.fetchClass(classLoader, "org.onap.policy.drools.utils.ReflectionUtil"); + + + assertTrue(ReflectionUtil.isClass(classLoader, "org.onap.policy.drools.utils.ReflectionUtil")); + assertEquals(class1,class2); + assertTrue(ReflectionUtil.isSubclass(ParentClass.class, ChildClass.class)); + assertFalse(ReflectionUtil.isSubclass(ChildClass.class, ParentClass.class)); + + + } catch (ClassNotFoundException e) { + fail(); + } + } + +} @@ -43,9 +43,9 @@ <common-modules.version>1.1.0-SNAPSHOT</common-modules.version> <dmaap.version>0.2.12</dmaap.version> <cambria.version>0.0.1</cambria.version> - <jersey.version>2.22.2</jersey.version> - <jersey.swagger.version>1.5.13</jersey.swagger.version> - <jackson.version>2.8.4</jackson.version> + <jersey.version>2.25.1</jersey.version> + <jersey.swagger.version>1.5.16</jersey.swagger.version> + <jackson.version>2.9.1</jackson.version> <http.client.version>4.5.2</http.client.version> <http.core.version>4.4.4</http.core.version> <logback.version>1.2.3</logback.version> @@ -55,7 +55,7 @@ <hibernate.core.version>5.2.10.Final</hibernate.core.version> <hibernate.commons.annotations.version>5.0.1.Final</hibernate.commons.annotations.version> <commons.io.version>2.5</commons.io.version> - <guava.version>16.0.1</guava.version> + <guava.version>23.0</guava.version> <nexusproxy>https://nexus.onap.org</nexusproxy> <sitePath>/content/sites/site/${project.groupId}/${project.artifactId}/${project.version}</sitePath> @@ -73,6 +73,8 @@ <module>feature-eelf</module> <module>feature-session-persistence</module> <module>feature-test-transaction</module> + <module>api-state-management</module> + <module>feature-state-management</module> <module>packages</module> </modules> |