aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore15
-rw-r--r--api-state-management/pom.xml62
-rw-r--r--api-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeatureAPI.java182
-rw-r--r--feature-session-persistence/src/assembly/assemble_zip.xml2
-rw-r--r--feature-session-persistence/src/main/feature/db/sessionpersistence/sql/18020-sessionpersistence.upgrade.sql6
-rw-r--r--feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java12
-rw-r--r--feature-state-management/pom.xml137
-rw-r--r--feature-state-management/src/assembly/assemble_zip.xml76
-rw-r--r--feature-state-management/src/main/feature/config/feature-state-management.properties82
-rw-r--r--feature-state-management/src/main/feature/db/statemanagement/sql/18020-statemanagement.upgrade.sql74
-rw-r--r--feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DbAudit.java218
-rw-r--r--feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DroolsPDPIntegrityMonitor.java398
-rw-r--r--feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/IntegrityMonitorRestManager.java110
-rw-r--r--feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java552
-rw-r--r--feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeature.java275
-rw-r--r--feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementProperties.java64
-rw-r--r--feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI1
-rw-r--r--feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI1
-rw-r--r--feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.statemanagement.StateManagementFeatureAPI1
-rw-r--r--feature-state-management/src/test/java/org/onap/policy/drools/statemanagement/test/StateManagementTest.java245
-rw-r--r--feature-state-management/src/test/resources/META-INF/persistence.xml39
-rw-r--r--feature-state-management/src/test/resources/feature-state-management.properties74
-rw-r--r--feature-state-management/src/test/resources/logback-test.xml47
-rw-r--r--packages/install/pom.xml6
-rw-r--r--policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java11
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java1207
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java365
-rw-r--r--policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java827
-rw-r--r--policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java339
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/persistence/FileSystemPersistence.java2
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java33
-rw-r--r--policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java3
-rw-r--r--policy-utils/src/main/java/org/onap/policy/drools/utils/NetworkUtil.java53
-rw-r--r--policy-utils/src/test/java/org/onap/policy/drools/utils/PairTripleTest.java65
-rw-r--r--policy-utils/src/test/java/org/onap/policy/drools/utils/ReflectionUtilTest.java62
-rw-r--r--pom.xml10
36 files changed, 4247 insertions, 1409 deletions
diff --git a/.gitignore b/.gitignore
deleted file mode 100644
index 30e60521..00000000
--- a/.gitignore
+++ /dev/null
@@ -1,15 +0,0 @@
-.DS_Store
-.project
-.settings
-.classpath
-.jupiter
-.pydevproject
-*.swp
-*.log
-*.out
-.metadata/
-target/
-*/logs/
-*/sql/
-*/testingLogs/
-*/config/
diff --git a/api-state-management/pom.xml b/api-state-management/pom.xml
new file mode 100644
index 00000000..f5c1e21e
--- /dev/null
+++ b/api-state-management/pom.xml
@@ -0,0 +1,62 @@
+<!--
+ ============LICENSE_START=======================================================
+ ONAP Policy Engine - Drools PDP
+ ================================================================================
+ Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ============LICENSE_END=========================================================
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.policy.drools-pdp</groupId>
+ <artifactId>drools-pdp</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>api-state-management</artifactId>
+
+ <name>api-state-management</name>
+ <description>Separately loadable module for state management APIe</description>
+
+ <properties>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+ <swagger.version>1.5.0</swagger.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <!-- none -->
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.onap.policy.drools-pdp</groupId>
+ <artifactId>policy-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.policy.common</groupId>
+ <artifactId>integrity-monitor</artifactId>
+ <version>${common-modules.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/api-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeatureAPI.java b/api-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeatureAPI.java
new file mode 100644
index 00000000..a6d808ca
--- /dev/null
+++ b/api-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeatureAPI.java
@@ -0,0 +1,182 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-core
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.statemanagement;
+
+import java.util.Observer;
+
+import org.onap.policy.common.im.StandbyStatusException;
+import org.onap.policy.common.im.StateManagement;
+import org.onap.policy.drools.properties.Lockable;
+import org.onap.policy.drools.utils.OrderedService;
+import org.onap.policy.drools.utils.OrderedServiceImpl;
+
+/**
+ * This interface provides a way to invoke optional features at various
+ * points in the code. At appropriate points in the
+ * application, the code iterates through this list, invoking these optional
+ * methods. Most of the methods here are notification only -- these tend to
+ * return a 'void' value. In other cases, such as 'activatePolicySession',
+ * may
+ */
+public interface StateManagementFeatureAPI extends OrderedService, Lockable
+{
+
+ public static final String LOCKED = StateManagement.LOCKED;
+ public static final String UNLOCKED = StateManagement.UNLOCKED;
+ public static final String ENABLED = StateManagement.ENABLED;
+ public static final String DISABLED = StateManagement.DISABLED;
+ public static final String ENABLE_NOT_FAILED = StateManagement.ENABLE_NOT_FAILED;
+ public static final String DISABLE_FAILED = StateManagement.DISABLE_FAILED;
+ public static final String FAILED = StateManagement.FAILED;
+ public static final String DEPENDENCY = StateManagement.DEPENDENCY;
+ public static final String DEPENDENCY_FAILED = StateManagement.DEPENDENCY_FAILED;
+ public static final String DISABLE_DEPENDENCY = StateManagement.DISABLE_DEPENDENCY;
+ public static final String ENABLE_NO_DEPENDENCY = StateManagement.ENABLE_NO_DEPENDENCY;
+ public static final String NULL_VALUE = StateManagement.NULL_VALUE;
+ public static final String LOCK = StateManagement.LOCK;
+ public static final String UNLOCK = StateManagement.UNLOCK;
+ public static final String PROMOTE = StateManagement.PROMOTE;
+ public static final String DEMOTE = StateManagement.DEMOTE;
+ public static final String HOT_STANDBY = StateManagement.HOT_STANDBY;
+ public static final String COLD_STANDBY = StateManagement.COLD_STANDBY;
+ public static final String PROVIDING_SERVICE = StateManagement.PROVIDING_SERVICE;
+
+ public static final String ADMIN_STATE = StateManagement.ADMIN_STATE;
+ public static final String OPERATION_STATE = StateManagement.OPERATION_STATE;
+ public static final String AVAILABLE_STATUS= StateManagement.AVAILABLE_STATUS;
+ public static final String STANDBY_STATUS = StateManagement.STANDBY_STATUS;
+
+ public static final int SEQ_NUM = 0;
+ /**
+ * 'FeatureAPI.impl.getList()' returns an ordered list of objects
+ * implementing the 'FeatureAPI' interface.
+ */
+ static public OrderedServiceImpl<StateManagementFeatureAPI> impl =
+ new OrderedServiceImpl<StateManagementFeatureAPI>(StateManagementFeatureAPI.class);
+
+ /**
+ * This method is called to add an Observer to receive notifications of state changes
+ *
+ * @param stateChangeObserver
+ */
+ public void addObserver(Observer stateChangeObserver);
+
+ /**
+ * This method returns the X.731 Administrative State for this resource
+ *
+ * @return String (locked, unlocked)
+ */
+ public String getAdminState();
+
+ /**
+ * This method returns the X.731 Operational State for this resource
+ *
+ * @return String (enabled, disabled)
+ */
+ public String getOpState();
+
+ /**
+ * This method returns the X.731 Availability Status for this resource
+ *
+ * @return String (failed; dependency; dependency,failed)
+ */
+ public String getAvailStatus();
+
+ /**
+ * This method returns the X.731 Standby Status for this resource
+ *
+ * @return String (providingservice, hotstandby or coldstandby)
+ */
+ public String getStandbyStatus();
+
+ /**
+ * This method returns the X.731 Standby Status for the named resource
+ * @param String (resourceName)
+ * @return String (providingservice, hotstandby or coldstandby)
+ */
+ public String getStandbyStatus(String resourceName);
+
+ /**
+ * This method moves the X.731 Operational State for the named resource
+ * into a value of disabled and the Availability Status to a value of failed.
+ * As a consequence the Standby Status value will take a value of coldstandby.
+ *
+ * @param String (resourceName)
+ * @throws Exception
+ */
+ public void disableFailed(String resourceName) throws Exception;
+
+ /**
+ * This method moves the X.731 Operational State for this resource
+ * into a value of disabled and the Availability Status to a value of failed.
+ * As a consequence the Standby Status value will take a value of coldstandby.
+ *
+ * @param String (resourceName)
+ * @throws Exception
+ */
+ public void disableFailed() throws Exception;
+
+ /**
+ * This method moves the X.731 Standby Status for this resource from hotstandby
+ * to providingservice. If the current value is coldstandby, no change is made.
+ * If the current value is null, it will move to providingservice assuming the
+ * Operational State is enabled and Administrative State is unlocked.
+ * @throws Exception
+ * @throws StandbyStatusException
+ */
+ public void promote() throws StandbyStatusException, Exception;
+
+ /**
+ * This method moves the X.731 Standby Status for this resource from providingservice
+ * to hotstandby. If the current value is null, it will move to hotstandby assuming the
+ * Operational State is enabled and Administrative State is unlocked. Else, it will move
+ * to coldstandby
+ * @throws Exception
+ */
+ public void demote() throws Exception;
+
+ /**
+ * This method returns the resourceName associated with this instance of the StateManagementFeature
+ * @return String (resourceName)
+ */
+ public String getResourceName();
+
+ /**
+ * This Lockable method will lock the StateManagement object Admin state
+ * @return true if successfull, false otherwise
+ */
+ @Override
+ public boolean lock();
+
+ /**
+ * This Lockable method will unlock the StateManagement object Admin state
+ * @return true if successfull, false otherwise
+ */
+ @Override
+ public boolean unlock();
+
+ /**
+ * This Lockable method indicates the Admin state StateManagement object
+ * @return true if locked, false otherwise
+ */
+ @Override
+ public boolean isLocked();
+}
diff --git a/feature-session-persistence/src/assembly/assemble_zip.xml b/feature-session-persistence/src/assembly/assemble_zip.xml
index 1cc3ce5a..8a315960 100644
--- a/feature-session-persistence/src/assembly/assemble_zip.xml
+++ b/feature-session-persistence/src/assembly/assemble_zip.xml
@@ -24,7 +24,7 @@
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
- <id>session-persistence</id>
+ <id>feature-session-persistence</id>
<formats>
<format>zip</format>
</formats>
diff --git a/feature-session-persistence/src/main/feature/db/sessionpersistence/sql/18020-sessionpersistence.upgrade.sql b/feature-session-persistence/src/main/feature/db/sessionpersistence/sql/18020-sessionpersistence.upgrade.sql
index 0e980968..5d4993fe 100644
--- a/feature-session-persistence/src/main/feature/db/sessionpersistence/sql/18020-sessionpersistence.upgrade.sql
+++ b/feature-session-persistence/src/main/feature/db/sessionpersistence/sql/18020-sessionpersistence.upgrade.sql
@@ -51,10 +51,4 @@ WORKITEMBYTEARRAY BLOB,
PRIMARY KEY (WORKITEMID)
);
-CREATE TABLE IF NOT EXISTS sessionpersistence.SESSIONINFO_ID_SEQ (next_val bigint) engine=MyISAM;
-INSERT INTO sessionpersistence.SESSIONINFO_ID_SEQ (next_val) SELECT 1 WHERE NOT EXISTS (SELECT * FROM sessionpersistence.SESSIONINFO_ID_SEQ);
-
-CREATE TABLE IF NOT EXISTS sessionpersistence.WORKITEMINFO_ID_SEQ (next_val bigint) engine=MyISAM;
-INSERT INTO sessionpersistence.WORKITEMINFO_ID_SEQ (next_val) SELECT 1 WHERE NOT EXISTS (SELECT * FROM sessionpersistence.WORKITEMINFO_ID_SEQ);
-
set foreign_key_checks=1; \ No newline at end of file
diff --git a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java
index e6603b68..b48690b0 100644
--- a/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java
+++ b/feature-session-persistence/src/main/java/org/onap/policy/drools/persistence/PersistenceFeature.java
@@ -732,7 +732,7 @@ public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngine
* before firing rules again. This is a "half" time, so that
* we can multiply it by two without overflowing the word size.
*/
- long halfMaxSleepTime = 5000 / 2;
+ long halfMaxSleepTime = 5000L / 2L;
/**
* Constructor - initialize variables and create thread
@@ -819,6 +819,7 @@ public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngine
} catch (InterruptedException e) {
logger.error("stopThread exception: ", e);
+ Thread.currentThread().interrupt();
}
// verify that it's done
@@ -866,11 +867,9 @@ public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngine
// no rules fired -- increase poll delay
sleepTime = 2 * Math.min(halfMaxSleepTime, sleepTime);
}
-
- } catch (Throwable e) {
- logger.error("startThread exception: ", e);
- }
-
+ } catch (Exception | LinkageError e) {
+ logger.error("Exception during kieSession.fireAllRules", e);
+ }
try {
if(stopped.await(sleepTime, TimeUnit.MILLISECONDS)) {
@@ -879,6 +878,7 @@ public class PersistenceFeature implements PolicySessionFeatureAPI, PolicyEngine
} catch (InterruptedException e) {
logger.error("startThread exception: ", e);
+ Thread.currentThread().interrupt();
break;
}
}
diff --git a/feature-state-management/pom.xml b/feature-state-management/pom.xml
new file mode 100644
index 00000000..5265cdbb
--- /dev/null
+++ b/feature-state-management/pom.xml
@@ -0,0 +1,137 @@
+<!--
+ ============LICENSE_START=======================================================
+ feature-state-management
+ ================================================================================
+ Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ============LICENSE_END=========================================================
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.policy.drools-pdp</groupId>
+ <artifactId>drools-pdp</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>feature-state-management</artifactId>
+
+ <name>feature-state-management</name>
+ <description>Separately loadable module for State Management</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <id>zipfile</id>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <attach>true</attach>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <descriptors>
+ <descriptor>src/assembly/assemble_zip.xml</descriptor>
+ </descriptors>
+ <appendAssemblyId>false</appendAssemblyId>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.8</version>
+ <executions>
+ <execution>
+ <id>copy-dependencies</id>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <phase>prepare-package</phase>
+ <configuration>
+ <transitive>false</transitive>
+ <outputDirectory>${project.build.directory}/assembly/lib</outputDirectory>
+ <overWriteReleases>false</overWriteReleases>
+ <overWriteSnapshots>true</overWriteSnapshots>
+ <overWriteIfNewer>true</overWriteIfNewer>
+ <useRepositoryLayout>false</useRepositoryLayout>
+ <addParentPoms>false</addParentPoms>
+ <copyPom>false</copyPom>
+ <includeScope>runtime</includeScope>
+ <excludeTransitive>true</excludeTransitive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.swagger</groupId>
+ <artifactId>swagger-jersey2-jaxrs</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.policy.drools-pdp</groupId>
+ <artifactId>policy-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.policy.drools-pdp</groupId>
+ <artifactId>policy-management</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.policy.drools-pdp</groupId>
+ <artifactId>api-state-management</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.h2database</groupId>
+ <artifactId>h2</artifactId>
+ <version>[1.4.186,)</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.persistence</groupId>
+ <artifactId>eclipselink</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <!-- Need to pull in to assembly -->
+ <dependency>
+ <groupId>org.onap.policy.common</groupId>
+ <artifactId>integrity-monitor</artifactId>
+ <version>${common-modules.version}</version>
+ </dependency>
+ <!-- Need to pull into assembly for IntegrityMonitor -->
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/feature-state-management/src/assembly/assemble_zip.xml b/feature-state-management/src/assembly/assemble_zip.xml
new file mode 100644
index 00000000..f398829d
--- /dev/null
+++ b/feature-state-management/src/assembly/assemble_zip.xml
@@ -0,0 +1,76 @@
+<!--
+ ============LICENSE_START=======================================================
+ feature-state-management
+ ================================================================================
+ Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ============LICENSE_END=========================================================
+ -->
+
+<!-- Defines how we build the .zip file which is our distribution. -->
+
+<assembly
+ xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+ <id>feature-state-management</id>
+ <formats>
+ <format>zip</format>
+ </formats>
+
+ <!-- we want "system" and related files right at the root level as this
+ file is suppose to be unzip on top of a karaf distro. -->
+ <includeBaseDirectory>false</includeBaseDirectory>
+
+ <fileSets>
+ <fileSet>
+ <directory>target</directory>
+ <outputDirectory>lib/feature</outputDirectory>
+ <includes>
+ <include>feature-state-management-${project.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>target/assembly/lib</directory>
+ <outputDirectory>lib/dependencies</outputDirectory>
+ <includes>
+ <include>*.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/feature/config</directory>
+ <outputDirectory>config</outputDirectory>
+ <fileMode>0644</fileMode>
+ <excludes/>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/feature/bin</directory>
+ <outputDirectory>bin</outputDirectory>
+ <fileMode>0744</fileMode>
+ <excludes/>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/feature/db</directory>
+ <outputDirectory>db</outputDirectory>
+ <fileMode>0744</fileMode>
+ <excludes/>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/feature/install</directory>
+ <outputDirectory>install</outputDirectory>
+ <fileMode>0744</fileMode>
+ <excludes/>
+ </fileSet>
+ </fileSets>
+</assembly>
diff --git a/feature-state-management/src/main/feature/config/feature-state-management.properties b/feature-state-management/src/main/feature/config/feature-state-management.properties
new file mode 100644
index 00000000..72c1fe22
--- /dev/null
+++ b/feature-state-management/src/main/feature/config/feature-state-management.properties
@@ -0,0 +1,82 @@
+###
+# ============LICENSE_START=======================================================
+# feature-state-management
+# ================================================================================
+# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+# DB properties
+javax.persistence.jdbc.driver=org.mariadb.jdbc.Driver
+javax.persistence.jdbc.url=jdbc:mariadb://${{SQL_HOST}}:3306/statemanagement
+javax.persistence.jdbc.user=${{SQL_USER}}
+javax.persistence.jdbc.password=${{SQL_PASSWORD}}
+
+# DroolsPDPIntegrityMonitor Properties
+hostPort=0.0.0.0:57692
+
+#IntegrityMonitor Properties
+
+# Must be unique across the system
+resource.name=pdp1
+# Name of the site in which this node is hosted
+site_name=site1
+# Forward Progress Monitor update interval seconds
+fp_monitor_interval=30
+# Failed counter threshold before failover
+failed_counter_threshold=3
+# Interval between test transactions when no traffic seconds
+test_trans_interval=10
+# Interval between writes of the FPC to the DB seconds
+write_fpc_interval=5
+# Node type Note: Make sure you don't leave any trailing spaces, or you'll get an 'invalid node type' error!
+node_type=pdp_drools
+# Dependency groups are groups of resources upon which a node operational state is dependent upon.
+# Each group is a comma-separated list of resource names and groups are separated by a semicolon. For example:
+# dependency_groups=site_1.astra_1,site_1.astra_2;site_1.brms_1,site_1.brms_2;site_1.logparser_1;site_1.pypdp_1
+dependency_groups=
+# When set to true, dependent health checks are performed by using JMX to invoke test() on the dependent.
+# The default false is to use state checks for health.
+test_via_jmx=true
+# This is the max number of seconds beyond which a non incrementing FPC is considered a failure
+max_fpc_update_interval=120
+# Run the state audit every 60 seconds (60000 ms). The state audit finds stale DB entries in the
+# forwardprogressentity table and marks the node as disabled/failed in the statemanagemententity
+# table. NOTE! It will only run on nodes that have a standbystatus = providingservice.
+# A value of <= 0 will turn off the state audit.
+state_audit_interval_ms=60000
+# The refresh state audit is run every (default) 10 minutes (600000 ms) to clean up any state corruption in the
+# DB statemanagemententity table. It only refreshes the DB state entry for the local node. That is, it does not
+# refresh the state of any other nodes. A value <= 0 will turn the audit off. Any other value will override
+# the default of 600000 ms.
+refresh_state_audit_interval_ms=600000
+
+
+# Repository audit properties
+
+# Assume it's the releaseRepository that needs to be audited,
+# because that's the one BRMGW will publish to.
+repository.audit.id=${{releaseRepositoryID}}
+repository.audit.url=${{releaseRepositoryUrl}}
+repository.audit.username=${{repositoryUsername}}
+repository.audit.password=${{repositoryPassword}}
+# Flag to control the execution of the subsystemTest for the Nexus Maven repository
+repository.audit.is.active=false
+repository.audit.ignore.errors=true
+
+# DB Audit Properties
+
+# Flag to control the execution of the subsystemTest for the Database
+db.audit.is.active=false \ No newline at end of file
diff --git a/feature-state-management/src/main/feature/db/statemanagement/sql/18020-statemanagement.upgrade.sql b/feature-state-management/src/main/feature/db/statemanagement/sql/18020-statemanagement.upgrade.sql
new file mode 100644
index 00000000..f73f992b
--- /dev/null
+++ b/feature-state-management/src/main/feature/db/statemanagement/sql/18020-statemanagement.upgrade.sql
@@ -0,0 +1,74 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * feature-state-management
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+set foreign_key_checks=0;
+
+CREATE TABLE if not exists statemanagement.StateManagementEntity
+(
+ id int not null auto_increment,
+ resourceName varchar(100) not null,
+ adminState varchar(20) not null,
+ opstate varchar(20) not null,
+ availStatus varchar(20),
+ standbyStatus varchar(20),
+ created_date timestamp not null default current_timestamp,
+ modifiedDate timestamp not null,
+ primary key(id),
+ unique key resource(resourceName)
+);
+
+CREATE TABLE if not exists statemanagement.ResourceRegistrationEntity
+(
+ resourceRegistrationId bigint not null auto_increment,
+ resourceName varchar(100) not null,
+ resourceURL varchar(255) not null,
+ site varchar(50),
+ nodetype varchar(50),
+ created_date timestamp not null default current_timestamp,
+ last_updated timestamp not null,
+ primary key (resourceRegistrationId),
+ unique key resource (resourceName),
+ unique key id_resource_url (resourceURL)
+);
+
+CREATE TABLE if not exists statemanagement.ForwardProgressEntity
+(
+ forwardProgressId bigint not null auto_increment,
+ resourceName varchar(100) not null,
+ fpc_count bigint not null,
+ created_date timestamp not null default current_timestamp,
+ last_updated timestamp not null,
+ primary key (forwardProgressId),
+ unique key resource_key (resourceName)
+);
+
+CREATE TABLE if not exists statemanagement.sequence
+(
+SEQ_NAME VARCHAR(50) NOT NULL,
+SEQ_COUNT DECIMAL(38,0),
+PRIMARY KEY (SEQ_NAME)
+);
+
+-- Will only insert a record if none exists:
+INSERT INTO statemanagement.SEQUENCE (SEQ_NAME,SEQ_COUNT)
+SELECT * FROM (SELECT 'SEQ_GEN',1) AS tmp
+WHERE NOT EXISTS(select SEQ_NAME from statemanagement.SEQUENCE where SEQ_NAME = 'SEQ_GEN') LIMIT 1;
+
+set foreign_key_checks=1; \ No newline at end of file
diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DbAudit.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DbAudit.java
new file mode 100644
index 00000000..a86ac8ef
--- /dev/null
+++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DbAudit.java
@@ -0,0 +1,218 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * feature-state-management
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.statemanagement;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class audits the database
+ */
+public class DbAudit extends DroolsPDPIntegrityMonitor.AuditBase
+{
+ // get an instance of logger
+ private static Logger logger = LoggerFactory.getLogger(DbAudit.class);
+ // single global instance of this audit object
+ final static private DbAudit instance = new DbAudit();
+
+ // This indicates if 'CREATE TABLE IF NOT EXISTS Audit ...' should be
+ // invoked -- doing this avoids the need to create the table in advance.
+ static private boolean createTableNeeded = true;
+
+ /**
+ * @return the single 'DbAudit' instance
+ */
+ static DroolsPDPIntegrityMonitor.AuditBase getInstance()
+ {
+ return(instance);
+ }
+
+ /**
+ * Constructor - set the name to 'Database'
+ */
+ private DbAudit()
+ {
+ super("Database");
+ }
+
+ /**
+ * Invoke the audit
+ *
+ * @param properties properties to be passed to the audit
+ */
+ @Override
+ public void invoke(Properties properties)
+ {
+ if(logger.isDebugEnabled()){
+ logger.debug("Running 'DbAudit.invoke'");
+ }
+ boolean isActive = true;
+ String dbAuditIsActive = StateManagementProperties.getProperty("db.audit.is.active");
+ if(logger.isDebugEnabled()){
+ logger.debug("DbAudit.invoke: dbAuditIsActive = {}", dbAuditIsActive);
+ }
+
+ if (dbAuditIsActive != null) {
+ try {
+ isActive = Boolean.parseBoolean(dbAuditIsActive.trim());
+ } catch (NumberFormatException e) {
+ logger.warn("DbAudit.invoke: Ignoring invalid property: db.audit.is.active = {}", dbAuditIsActive);
+ }
+ }
+
+ if(!isActive){
+
+ logger.info("DbAudit.invoke: exiting because isActive = {}", isActive);
+ return;
+ }
+
+ // fetch DB properties from properties file -- they are already known
+ // to exist, because they were verified by the 'IntegrityMonitor'
+ // constructor
+ String url = properties.getProperty(StateManagementProperties.DB_URL);
+ String user = properties.getProperty(StateManagementProperties.DB_USER);
+ String password = properties.getProperty(StateManagementProperties.DB_PWD);
+
+ // connection to DB
+ Connection connection = null;
+
+ // supports SQL operations
+ PreparedStatement statement = null;
+ ResultSet rs = null;
+
+ // operation phase currently running -- used to construct an error
+ // message, if needed
+ String phase = null;
+
+ try
+ {
+ // create connection to DB
+ phase = "creating connection";
+ if(logger.isDebugEnabled()){
+ logger.debug("DbAudit: Creating connection to {}", url);
+ }
+
+ connection = DriverManager.getConnection(url, user, password);
+
+ // create audit table, if needed
+ if (createTableNeeded)
+ {
+ phase = "create table";
+ if(logger.isDebugEnabled()){
+ logger.info("DbAudit: Creating 'Audit' table, if needed");
+ }
+ statement = connection.prepareStatement
+ ("CREATE TABLE IF NOT EXISTS Audit (\n"
+ + " name varchar(64) DEFAULT NULL,\n"
+ + " UNIQUE KEY name (name)\n"
+ + ") DEFAULT CHARSET=latin1;");
+ statement.execute();
+ statement.close();
+ createTableNeeded = false;
+ }
+
+ // insert an entry into the table
+ phase = "insert entry";
+ String key = UUID.randomUUID().toString();
+ statement = connection.prepareStatement
+ ("INSERT INTO Audit (name) VALUES (?)");
+ statement.setString(1, key);
+ statement.executeUpdate();
+ statement.close();
+
+ // fetch the entry from the table
+ phase = "fetch entry";
+ statement = connection.prepareStatement
+ ("SELECT name FROM Audit WHERE name = ?");
+ statement.setString(1, key);
+ rs = statement.executeQuery();
+ if (rs.first())
+ {
+ // found entry
+ if(logger.isDebugEnabled()){
+ logger.debug("DbAudit: Found key {}", rs.getString(1));
+ }
+ }
+ else
+ {
+ logger.error
+ ("DbAudit: can't find newly-created entry with key {}", key);
+ setResponse("Can't find newly-created entry");
+ }
+ statement.close();
+
+ // delete entries from table
+ phase = "delete entry";
+ statement = connection.prepareStatement
+ ("DELETE FROM Audit WHERE name = ?");
+ statement.setString(1, key);
+ statement.executeUpdate();
+ statement.close();
+ statement = null;
+ }
+ catch (Exception e)
+ {
+ String message = "DbAudit: Exception during audit, phase = " + phase;
+ logger.error(message, e);
+ setResponse(message);
+ }
+ finally
+ {
+ if (rs != null)
+ {
+ try
+ {
+ rs.close();
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ if (statement != null)
+ {
+ try
+ {
+ statement.close();
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ if (connection != null)
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+ }
+}
diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DroolsPDPIntegrityMonitor.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DroolsPDPIntegrityMonitor.java
new file mode 100644
index 00000000..73f6f738
--- /dev/null
+++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/DroolsPDPIntegrityMonitor.java
@@ -0,0 +1,398 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * feature-state-management
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.statemanagement;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.onap.policy.common.im.IntegrityMonitor;
+import org.onap.policy.common.im.IntegrityMonitorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.onap.policy.drools.core.PolicyContainer;
+import org.onap.policy.drools.http.server.HttpServletServer;
+import org.onap.policy.drools.properties.Startable;
+import org.onap.policy.drools.utils.PropertyUtil;
+
+/**
+ * This class extends 'IntegrityMonitor' for use in the 'Drools PDP'
+ * virtual machine. The included audits are 'Database' and 'Repository'.
+ */
+public class DroolsPDPIntegrityMonitor extends IntegrityMonitor
+{
+
+ // get an instance of logger
+ private static final Logger logger = LoggerFactory.getLogger(DroolsPDPIntegrityMonitor.class);
+
+ // static global instance
+ static private DroolsPDPIntegrityMonitor im = null;
+
+ // list of audits to run
+ static private AuditBase[] audits =
+ new AuditBase[]{DbAudit.getInstance(), RepositoryAudit.getInstance()};
+
+ static private Properties subsystemTestProperties = null;
+
+ static private final String PROPERTIES_NAME = "feature-state-management.properties";
+ /**
+ * Static initialization -- create Drools Integrity Monitor, and
+ * an HTTP server to handle REST 'test' requests
+ */
+ static public DroolsPDPIntegrityMonitor init(String configDir) throws Exception
+ {
+
+ logger.info("init: Entering and invoking PropertyUtil.getProperties() on '{}'", configDir);
+
+ // read in properties
+ Properties stateManagementProperties =
+ PropertyUtil.getProperties(configDir + "/" + PROPERTIES_NAME);
+
+ subsystemTestProperties = stateManagementProperties;
+
+ // fetch and verify definitions of some properties
+ // (the 'IntegrityMonitor' constructor does some additional verification)
+
+ String resourceName = stateManagementProperties.getProperty("resource.name");
+ String hostPort = stateManagementProperties.getProperty("hostPort");
+ String fpMonitorInterval = stateManagementProperties.getProperty("fp_monitor_interval");
+ String failedCounterThreshold = stateManagementProperties.getProperty("failed_counter_threshold");
+ String testTransInterval = stateManagementProperties.getProperty("test_trans_interval");
+ String writeFpcInterval = stateManagementProperties.getProperty("write_fpc_interval");
+ String siteName = stateManagementProperties.getProperty("site_name");
+ String nodeType = stateManagementProperties.getProperty("node_type");
+ String dependencyGroups = stateManagementProperties.getProperty("dependency_groups");
+ String javaxPersistenceJdbcDriver = stateManagementProperties.getProperty("javax.persistence.jdbc.driver");
+ String javaxPersistenceJdbcUrl = stateManagementProperties.getProperty("javax.persistence.jdbc.url");
+ String javaxPersistenceJdbcUser = stateManagementProperties.getProperty("javax.persistence.jdbc.user");
+ String javaxPersistenceJdbcPassword = stateManagementProperties.getProperty("javax.persistence.jdbc.password");
+
+ if (resourceName == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'resource.name'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'resource.name'"));
+ }
+ if (hostPort == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'hostPort'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'hostPort'"));
+ }
+ if (fpMonitorInterval == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'fp_monitor_interval'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'fp_monitor_interval'"));
+ }
+ if (failedCounterThreshold == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'failed_counter_threshold'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'failed_counter_threshold'"));
+ }
+ if (testTransInterval == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'test_trans_interval'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'test_trans_interval'"));
+ }
+ if (writeFpcInterval == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'write_fpc_interval'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'write_fpc_interval'"));
+ }
+ if (siteName == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'site_name'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'site_name'"));
+ }
+ if (nodeType == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'node_type'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'node_type'"));
+ }
+ if (dependencyGroups == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'dependency_groups'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'dependency_groups'"));
+ }
+ if (javaxPersistenceJdbcDriver == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'javax.persistence.jbdc.driver for xacml DB'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'javax.persistence.jbdc.driver for xacml DB'"));
+ }
+ if (javaxPersistenceJdbcUrl == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'javax.persistence.jbdc.url for xacml DB'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'javax.persistence.jbdc.url for xacml DB'"));
+ }
+ if (javaxPersistenceJdbcUser == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'javax.persistence.jbdc.user for xacml DB'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'javax.persistence.jbdc.user for xacml DB'"));
+ }
+ if (javaxPersistenceJdbcPassword == null)
+ {
+ logger.error("init: Missing IntegrityMonitor property: 'javax.persistence.jbdc.password for xacml DB'");
+ throw(new Exception
+ ("Missing IntegrityMonitor property: 'javax.persistence.jbdc.password' for xacml DB'"));
+ }
+
+ // Now that we've validated the properties, create Drools Integrity Monitor
+ // with these properties.
+ im = new DroolsPDPIntegrityMonitor(resourceName,
+ stateManagementProperties);
+ logger.info("init: New DroolsPDPIntegrityMonitor instantiated, hostPort= {}", hostPort);
+
+ // determine host and port for HTTP server
+ int index = hostPort.lastIndexOf(':');
+ InetSocketAddress addr;
+
+ if (index < 0)
+ {
+ addr = new InetSocketAddress(Integer.valueOf(hostPort));
+ }
+ else
+ {
+ addr = new InetSocketAddress
+ (hostPort.substring(0, index),
+ Integer.valueOf(hostPort.substring(index + 1)));
+ }
+
+ // create http server
+ try {
+ logger.info("init: Starting HTTP server, addr= {}", addr);
+ IntegrityMonitorRestServer server = new IntegrityMonitorRestServer();
+
+ server.init(stateManagementProperties);
+
+ System.out.println("init: Started server on hostPort=" + hostPort);
+ } catch (Exception e) {
+ logger.error("init: Caught Exception attempting to start server on hostPort= {}, message = {}",
+ hostPort, e.getMessage());
+ throw e;
+
+ }
+
+ logger.info("init: Exiting and returning DroolsPDPIntegrityMonitor");
+ return im;
+ }
+
+ /**
+ * Constructor - pass arguments to superclass, but remember properties
+ * @param resourceName unique name of this Integrity Monitor
+ * @param url the JMX URL of the MBean server
+ * @param properties properties used locally, as well as by
+ * 'IntegrityMonitor'
+ * @throws Exception (passed from superclass)
+ */
+ private DroolsPDPIntegrityMonitor(String resourceName,
+ Properties consolidatedProperties
+ ) throws Exception {
+ super(resourceName, consolidatedProperties);
+ }
+
+ /**
+ * Run tests (audits) unique to Drools PDP VM (Database + Repository)
+ */
+ @Override
+ public void subsystemTest() throws IntegrityMonitorException
+ {
+ logger.info("DroolsPDPIntegrityMonitor.subsystemTest called");
+
+ // clear all responses (non-null values indicate an error)
+ for (AuditBase audit : audits)
+ {
+ audit.setResponse(null);
+ }
+
+ // invoke all of the audits
+ for (AuditBase audit : audits)
+ {
+ try
+ {
+ // invoke the audit (responses are stored within the audit object)
+ audit.invoke(subsystemTestProperties);
+ }
+ catch (Exception e)
+ {
+ logger.error("{} audit error", audit.getName(), e);
+ if (audit.getResponse() == null)
+ {
+ // if there is no current response, use the exception message
+ audit.setResponse(e.getMessage());
+ }
+ }
+ }
+
+ // will contain list of subsystems where the audit failed
+ String responseMsg = "";
+
+ // Loop through all of the audits, and see which ones have failed.
+ // NOTE: response information is stored within the audit objects
+ // themselves -- only one can run at a time.
+ for (AuditBase audit : audits)
+ {
+ String response = audit.getResponse();
+ if (response != null)
+ {
+ // the audit has failed -- add subsystem and
+ // and 'responseValue' with the new information
+ responseMsg = responseMsg.concat("\n" + audit.getName() + ": " + response);
+ }
+ }
+
+ if(!responseMsg.isEmpty()){
+ throw new IntegrityMonitorException(responseMsg);
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This is the base class for audits invoked in 'subsystemTest'
+ */
+ static public abstract class AuditBase
+ {
+ // name of the audit
+ protected String name;
+
+ // non-null indicates the error response
+ protected String response;
+
+ /**
+ * Constructor - initialize the name, and clear the initial response
+ * @param name name of the audit
+ */
+ public AuditBase(String name)
+ {
+ this.name = name;
+ this.response = null;
+ }
+
+ /**
+ * @return the name of this audit
+ */
+ public String getName()
+ {
+ return(name);
+ }
+
+ /**
+ * @return the response String (non-null indicates the error message)
+ */
+ public String getResponse()
+ {
+ return(response);
+ }
+
+ /**
+ * Set the response string to the specified value
+ * @param value the new value of the response string (null = no errors)
+ */
+ public void setResponse(String value)
+ {
+ response = value;
+ }
+
+ /**
+ * Abstract method to invoke the audit
+ * @param persistenceProperties Used for DB access
+ * @throws Exception passed in by the audit
+ */
+ abstract void invoke(Properties persistenceProperties) throws Exception;
+ }
+
+ public static class IntegrityMonitorRestServer implements Startable {
+ protected volatile HttpServletServer server = null;
+ protected volatile Properties integrityMonitorRestServerProperties = null;
+
+ public void init(Properties props) {
+ this.integrityMonitorRestServerProperties = props;
+ this.start();
+ }
+
+ @Override
+ public boolean start() throws IllegalStateException {
+ try {
+ ArrayList<HttpServletServer> servers = HttpServletServer.factory.build(integrityMonitorRestServerProperties);
+
+ if (!servers.isEmpty()) {
+ server = servers.get(0);
+
+ try {
+ server.waitedStart(5);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ } catch (Exception e) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean stop() throws IllegalStateException {
+ try {
+ server.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ return true;
+ }
+
+ @Override
+ public void shutdown() throws IllegalStateException {
+ this.stop();
+ }
+
+ @Override
+ public synchronized boolean isAlive() {
+ return this.integrityMonitorRestServerProperties != null;
+ }
+ }
+
+ public static DroolsPDPIntegrityMonitor getInstance() throws Exception{
+ if(logger.isDebugEnabled()){
+ logger.debug("getInstance() called");
+ }
+ if (im == null) {
+ String msg = "No DroolsPDPIntegrityMonitor instance exists."
+ + " Please use the method DroolsPDPIntegrityMonitor init(String configDir)";
+ throw new Exception(msg);
+ }else{
+ return im;
+ }
+ }
+}
diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/IntegrityMonitorRestManager.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/IntegrityMonitorRestManager.java
new file mode 100644
index 00000000..f5024299
--- /dev/null
+++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/IntegrityMonitorRestManager.java
@@ -0,0 +1,110 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * feature-state-management
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.statemanagement;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+@Api(value = "test")
+ @Path("/")
+public class IntegrityMonitorRestManager {
+ private static Logger logger = LoggerFactory.getLogger(IntegrityMonitorRestManager.class);
+ private DroolsPDPIntegrityMonitor im;
+
+ /**
+ * Test interface for Integrity Monitor
+ *
+ * @return Exception message if exception, otherwise empty
+ */
+ @ApiOperation(
+ value = "Test endpoint for integrity monitor",
+ notes = "The TEST command is used to request data from a subcomponent "
+ + "instance that can be used to determine its operational state. "
+ + "A 200/success response status code should be returned if the "
+ + "subcomponent instance is functioning properly and able to respond to requests.",
+ response = String.class)
+ @ApiResponses(value = {
+ @ApiResponse(
+ code = 200,
+ message = "Integrity monitor sanity check passed"),
+ @ApiResponse(
+ code = 500,
+ message = "Integrity monitor sanity check encountered an exception. This can indicate operational state disabled or administrative state locked")
+ })
+ @GET
+ @Path("test")
+ public Response test() {
+ logger.error("integrity monitor /test accessed");
+ // The responses are stored within the audit objects, so we need to
+ // invoke the audits and get responses before we handle another
+ // request.
+ synchronized (IntegrityMonitorRestManager.class) {
+ // will include messages associated with subsystem failures
+ StringBuilder body = new StringBuilder();
+
+ // 200=SUCCESS, 500=failure
+ int responseValue = 200;
+
+ if (im == null) {
+ try {
+ im = DroolsPDPIntegrityMonitor.getInstance();
+ } catch (Exception e) {
+ logger.error("IntegrityMonitorRestManager: test() interface caught an exception", e);
+ e.printStackTrace();
+
+ body.append("\nException: " + e + "\n");
+ responseValue = 500;
+ }
+ }
+
+ if (im != null) {
+ try {
+ // call 'IntegrityMonitor.evaluateSanity()'
+ im.evaluateSanity();
+ } catch (Exception e) {
+ // this exception isn't coming from one of the audits,
+ // because those are caught in 'subsystemTest()'
+ logger.error("DroolsPDPIntegrityMonitor.evaluateSanity()", e);
+
+ // include exception in HTTP response
+ body.append("\nException: " + e + "\n");
+ responseValue = 500;
+ }
+ }
+
+ // send response, including the contents of 'body'
+ // (which is empty if everything is successful)
+ if (responseValue == 200)
+ return Response.status(Response.Status.OK).build();
+ else
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(body.toString()).build();
+ }
+ }
+}
diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java
new file mode 100644
index 00000000..6171572a
--- /dev/null
+++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/RepositoryAudit.java
@@ -0,0 +1,552 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * feature-state-management
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.statemanagement;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.LinkedList;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class audits the Maven repository
+ */
+public class RepositoryAudit extends DroolsPDPIntegrityMonitor.AuditBase
+{
+ private static final long DEFAULT_TIMEOUT = 60; // timeout in 60 seconds
+
+ // get an instance of logger
+ private static Logger logger = LoggerFactory.getLogger(RepositoryAudit.class);
+ // single global instance of this audit object
+ static private RepositoryAudit instance = new RepositoryAudit();
+
+ /**
+ * @return the single 'RepositoryAudit' instance
+ */
+ static DroolsPDPIntegrityMonitor.AuditBase getInstance()
+ {
+ return(instance);
+ }
+
+ /**
+ * Constructor - set the name to 'Repository'
+ */
+ private RepositoryAudit()
+ {
+ super("Repository");
+ }
+
+ /**
+ * Invoke the audit
+ *
+ * @param properties properties to be passed to the audit
+ */
+ @Override
+ public void invoke(Properties properties)
+ throws IOException, InterruptedException
+ {
+ if(logger.isDebugEnabled()){
+ logger.debug("Running 'RepositoryAudit.invoke'");
+ }
+
+ boolean isActive = true;
+ boolean ignoreErrors = true; // ignore errors by default
+ String repoAuditIsActive = StateManagementProperties.getProperty("repository.audit.is.active");
+ String repoAuditIgnoreErrors =
+ StateManagementProperties.getProperty("repository.audit.ignore.errors");
+ logger.debug("RepositoryAudit.invoke: repoAuditIsActive = {}"
+ + ", repoAuditIgnoreErrors = {}",repoAuditIsActive, repoAuditIgnoreErrors);
+
+ if (repoAuditIsActive != null) {
+ try {
+ isActive = Boolean.parseBoolean(repoAuditIsActive.trim());
+ } catch (NumberFormatException e) {
+ logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.is.active = {}", repoAuditIsActive);
+ }
+ }
+
+ if(!isActive){
+ logger.info("RepositoryAudit.invoke: exiting because isActive = {}", isActive);
+ return;
+ }
+
+ if (repoAuditIgnoreErrors != null)
+ {
+ try
+ {
+ ignoreErrors = Boolean.parseBoolean(repoAuditIgnoreErrors.trim());
+ }
+ catch (NumberFormatException e)
+ {
+ ignoreErrors = true;
+ logger.warn("RepositoryAudit.invoke: Ignoring invalid property: repository.audit.ignore.errors = {}", repoAuditIgnoreErrors);
+ }
+ }else{
+ ignoreErrors = true;
+ }
+
+ // Fetch repository information from 'IntegrityMonitorProperties'
+ String repositoryId =
+ StateManagementProperties.getProperty("repository.audit.id");
+ String repositoryUrl =
+ StateManagementProperties.getProperty("repository.audit.url");
+ String repositoryUsername =
+ StateManagementProperties.getProperty("repository.audit.username");
+ String repositoryPassword =
+ StateManagementProperties.getProperty("repository.audit.password");
+ boolean upload =
+ (repositoryId != null && repositoryUrl != null
+ && repositoryUsername != null && repositoryPassword != null);
+
+ // used to incrementally construct response as problems occur
+ // (empty = no problems)
+ StringBuilder response = new StringBuilder();
+
+ long timeoutInSeconds = DEFAULT_TIMEOUT;
+ String timeoutString =
+ StateManagementProperties.getProperty("repository.audit.timeout");
+ if (timeoutString != null && !timeoutString.isEmpty())
+ {
+ try
+ {
+ timeoutInSeconds = Long.valueOf(timeoutString);
+ }
+ catch (NumberFormatException e)
+ {
+ logger.error
+ ("RepositoryAudit: Invalid 'repository.audit.timeout' value: '{}'", timeoutString, e);
+ if (!ignoreErrors)
+ {
+ response.append("Invalid 'repository.audit.timeout' value: '")
+ .append(timeoutString).append("'\n");
+ setResponse(response.toString());
+ }
+ }
+ }
+
+ // artifacts to be downloaded
+ LinkedList<Artifact> artifacts = new LinkedList<>();
+
+ /*
+ * 1) create temporary directory
+ */
+ Path dir = Files.createTempDirectory("auditRepo");
+ logger.info("RepositoryAudit: temporary directory = {}", dir);
+
+ // nested 'pom.xml' file and 'repo' directory
+ Path pom = dir.resolve("pom.xml");
+ Path repo = dir.resolve("repo");
+
+ /*
+ * 2) Create test file, and upload to repository
+ * (only if repository information is specified)
+ */
+ String groupId = null;
+ String artifactId = null;
+ String version = null;
+ if (upload)
+ {
+ groupId = "org.onap.policy.audit";
+ artifactId = "repository-audit";
+ version = "0." + System.currentTimeMillis();
+
+ if (repositoryUrl.toLowerCase().contains("snapshot"))
+ {
+ // use SNAPSHOT version
+ version += "-SNAPSHOT";
+ }
+
+ // create text file to write
+ FileOutputStream fos =
+ new FileOutputStream(dir.resolve("repository-audit.txt").toFile());
+ try
+ {
+ fos.write(version.getBytes());
+ }
+ finally
+ {
+ fos.close();
+ }
+
+ // try to install file in repository
+ if (runProcess
+ (timeoutInSeconds, dir.toFile(), null,
+ "mvn", "deploy:deploy-file",
+ "-DrepositoryId=" + repositoryId,
+ "-Durl=" + repositoryUrl,
+ "-Dfile=repository-audit.txt",
+ "-DgroupId=" + groupId,
+ "-DartifactId=" + artifactId,
+ "-Dversion=" + version,
+ "-Dpackaging=txt",
+ "-DgeneratePom=false") != 0)
+ {
+ logger.error
+ ("RepositoryAudit: 'mvn deploy:deploy-file' failed");
+ if (!ignoreErrors)
+ {
+ response.append("'mvn deploy:deploy-file' failed\n");
+ setResponse(response.toString());
+ }
+ }
+ else
+ {
+ logger.info
+ ("RepositoryAudit: 'mvn deploy:deploy-file succeeded");
+
+ // we also want to include this new artifact in the download
+ // test (steps 3 and 4)
+ artifacts.add(new Artifact(groupId, artifactId, version, "txt"));
+ }
+ }
+
+ /*
+ * 3) create 'pom.xml' file in temporary directory
+ */
+ artifacts.add(new Artifact("org.apache.maven/maven-embedder/3.2.2"));
+
+ StringBuilder sb = new StringBuilder();
+ sb.append
+ ("<project xmlns=\"http://maven.apache.org/POM/4.0.0\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"\n"
+ + " xsi:schemaLocation=\"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd\">\n"
+ + "\n"
+ + " <modelVersion>4.0.0</modelVersion>\n"
+ + " <groupId>empty</groupId>\n"
+ + " <artifactId>empty</artifactId>\n"
+ + " <version>1.0-SNAPSHOT</version>\n"
+ + " <packaging>pom</packaging>\n"
+ + "\n"
+ + " <build>\n"
+ + " <plugins>\n"
+ + " <plugin>\n"
+ + " <groupId>org.apache.maven.plugins</groupId>\n"
+ + " <artifactId>maven-dependency-plugin</artifactId>\n"
+ + " <version>2.10</version>\n"
+ + " <executions>\n"
+ + " <execution>\n"
+ + " <id>copy</id>\n"
+ + " <goals>\n"
+ + " <goal>copy</goal>\n"
+ + " </goals>\n"
+ + " <configuration>\n"
+ + " <localRepositoryDirectory>")
+ .append(repo)
+ .append("</localRepositoryDirectory>\n")
+ .append(" <artifactItems>\n");
+ for (Artifact artifact : artifacts)
+ {
+ // each artifact results in an 'artifactItem' element
+ sb.append
+ (" <artifactItem>\n"
+ + " <groupId>")
+ .append(artifact.groupId)
+ .append
+ ("</groupId>\n"
+ + " <artifactId>")
+ .append(artifact.artifactId)
+ .append
+ ("</artifactId>\n"
+ + " <version>")
+ .append(artifact.version)
+ .append
+ ("</version>\n"
+ + " <type>")
+ .append(artifact.type)
+ .append
+ ("</type>\n"
+ + " </artifactItem>\n");
+ }
+ sb.append
+ (" </artifactItems>\n"
+ + " </configuration>\n"
+ + " </execution>\n"
+ + " </executions>\n"
+ + " </plugin>\n"
+ + " </plugins>\n"
+ + " </build>\n"
+ + "</project>\n");
+ FileOutputStream fos = new FileOutputStream(pom.toFile());
+ try
+ {
+ fos.write(sb.toString().getBytes());
+ }
+ finally
+ {
+ fos.close();
+ }
+
+ /*
+ * 4) Invoke external 'mvn' process to do the downloads
+ */
+
+ // output file = ${dir}/out (this supports step '4a')
+ File output = dir.resolve("out").toFile();
+
+ // invoke process, and wait for response
+ int rval = runProcess
+ (timeoutInSeconds, dir.toFile(), output, "mvn", "compile");
+ logger.info("RepositoryAudit: 'mvn' return value = {}", rval);
+ if (rval != 0)
+ {
+ logger.error
+ ("RepositoryAudit: 'mvn compile' invocation failed");
+ if (!ignoreErrors)
+ {
+ response.append("'mvn compile' invocation failed\n");
+ setResponse(response.toString());
+ }
+ }
+
+ /*
+ * 4a) Check attempted and successful downloads from output file
+ * Note: at present, this step just generates log messages,
+ * but doesn't do any verification.
+ */
+ if (rval == 0)
+ {
+ // place output in 'fileContents' (replacing the Return characters
+ // with Newline)
+ byte[] outputData = new byte[(int)output.length()];
+ FileInputStream fis = new FileInputStream(output);
+ fis.read(outputData);
+ String fileContents = new String(outputData).replace('\r','\n');
+ fis.close();
+
+ // generate log messages from 'Downloading' and 'Downloaded'
+ // messages within the 'mvn' output
+ int index = 0;
+ while ((index = fileContents.indexOf("\nDown", index)) > 0)
+ {
+ index += 5;
+ if (fileContents.regionMatches(index, "loading: ", 0, 9))
+ {
+ index += 9;
+ int endIndex = fileContents.indexOf('\n', index);
+ logger.info
+ ("RepositoryAudit: Attempted download: '{}'", fileContents.substring(index, endIndex));
+ index = endIndex;
+ }
+ else if (fileContents.regionMatches(index, "loaded: ", 0, 8))
+ {
+ index += 8;
+ int endIndex = fileContents.indexOf(' ', index);
+ logger.info
+ ("RepositoryAudit: Successful download: '{}'",fileContents.substring(index, endIndex));
+ index = endIndex;
+ }
+ }
+ }
+
+ /*
+ * 5) Check the contents of the directory to make sure the downloads
+ * were successful
+ */
+ for (Artifact artifact : artifacts)
+ {
+ if (repo.resolve(artifact.groupId.replace('.','/'))
+ .resolve(artifact.artifactId)
+ .resolve(artifact.version)
+ .resolve(artifact.artifactId + "-" + artifact.version + "."
+ + artifact.type).toFile().exists())
+ {
+ // artifact exists, as expected
+ logger.info("RepositoryAudit: {} : exists", artifact.toString());
+ }
+ else
+ {
+ // Audit ERROR: artifact download failed for some reason
+ logger.error("RepositoryAudit: {}: does not exist", artifact.toString());
+ if (!ignoreErrors)
+ {
+ response.append("Failed to download artifact: ")
+ .append(artifact).append('\n');
+ setResponse(response.toString());
+ }
+ }
+ }
+
+ /*
+ * 6) Use 'curl' to delete the uploaded test file
+ * (only if repository information is specified)
+ */
+ if (upload)
+ {
+ if (runProcess
+ (timeoutInSeconds, dir.toFile(), null,
+ "curl",
+ "--request", "DELETE",
+ "--user", repositoryUsername + ":" + repositoryPassword,
+ (repositoryUrl + "/" + groupId.replace('.', '/') + "/" +
+ artifactId + "/" + version))
+ != 0)
+ {
+ logger.error
+ ("RepositoryAudit: delete of uploaded artifact failed");
+ if (!ignoreErrors)
+ {
+ response.append("delete of uploaded artifact failed\n");
+ setResponse(response.toString());
+ }
+ }
+ else
+ {
+ logger.info
+ ("RepositoryAudit: delete of uploaded artifact succeeded");
+ artifacts.add(new Artifact(groupId, artifactId, version, "txt"));
+ }
+ }
+
+ /*
+ * 7) Remove the temporary directory
+ */
+ Files.walkFileTree
+ (dir,
+ new SimpleFileVisitor<Path>()
+ {
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
+ {
+ // logger.info("RepositoryAudit: Delete " + file);
+ file.toFile().delete();
+ return(FileVisitResult.CONTINUE);
+ }
+
+ public FileVisitResult postVisitDirectory(Path file, IOException e)
+ throws IOException
+ {
+ if (e == null)
+ {
+ // logger.info("RepositoryAudit: Delete " + file);
+ file.toFile().delete();
+ return(FileVisitResult.CONTINUE);
+ }
+ else
+ {
+ throw(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * Run a process, and wait for the response
+ *
+ * @param timeoutInSeconds the number of seconds to wait for the
+ * process to terminate
+ * @param directory the execution directory of the process
+ * (null = current directory)
+ * @param stdout the file to contain the standard output
+ * (null = discard standard output)
+ * @param command command and arguments
+ * @return the return value of the process
+ * @throws IOException, InterruptedException
+ */
+ static int runProcess(long timeoutInSeconds,
+ File directory, File stdout, String... command)
+ throws IOException, InterruptedException
+ {
+ ProcessBuilder pb = new ProcessBuilder(command);
+ if (directory != null)
+ {
+ pb.directory(directory);
+ }
+ if (stdout != null)
+ {
+ pb.redirectOutput(stdout);
+ }
+
+ Process process = pb.start();
+ if (process.waitFor(timeoutInSeconds, TimeUnit.SECONDS))
+ {
+ // process terminated before the timeout
+ return(process.exitValue());
+ }
+
+ // process timed out -- kill it, and return -1
+ process.destroyForcibly();
+ return(-1);
+ }
+
+ /* ============================================================ */
+
+ /**
+ * An instance of this class exists for each artifact that we are trying
+ * to download.
+ */
+ static class Artifact
+ {
+ String groupId, artifactId, version, type;
+
+ /**
+ * Constructor - populate the 'Artifact' instance
+ *
+ * @param groupId groupId of artifact
+ * @param artifactId artifactId of artifact
+ * @param version version of artifact
+ * @param type type of the artifact (e.g. "jar")
+ */
+ Artifact(String groupId, String artifactId, String version, String type)
+ {
+ this.groupId = groupId;
+ this.artifactId = artifactId;
+ this.version = version;
+ this.type = type;
+ }
+
+ /**
+ * Constructor - populate an 'Artifact' instance
+ *
+ * @param artifact a string of the form:
+ * "<groupId>/<artifactId>/<version>[/<type>]"
+ * @throws IllegalArgumentException if 'artifact' has the incorrect format
+ */
+ Artifact(String artifact)
+ {
+ String[] segments = artifact.split("/");
+ if (segments.length != 4 && segments.length != 3)
+ {
+ throw(new IllegalArgumentException("groupId/artifactId/version/type"));
+ }
+ groupId = segments[0];
+ artifactId = segments[1];
+ version = segments[2];
+ type = (segments.length == 4 ? segments[3] : "jar");
+ }
+
+ /**
+ * @return the artifact id in the form:
+ * "<groupId>/<artifactId>/<version>/<type>"
+ */
+ public String toString()
+ {
+ return(groupId + "/" + artifactId + "/" + version + "/" + type);
+ }
+ }
+}
diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeature.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeature.java
new file mode 100644
index 00000000..6d47039e
--- /dev/null
+++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementFeature.java
@@ -0,0 +1,275 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * feature-state-management
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.statemanagement;
+
+import java.io.IOException;
+import java.util.Observer;
+import java.util.Properties;
+
+import org.onap.policy.drools.statemanagement.StateManagementFeatureAPI;
+import org.onap.policy.common.im.StandbyStatusException;
+import org.onap.policy.common.im.StateManagement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.onap.policy.drools.core.PolicySessionFeatureAPI;
+import org.onap.policy.drools.features.PolicyEngineFeatureAPI;
+import org.onap.policy.drools.utils.PropertyUtil;
+
+/**
+ * If this feature is supported, there is a single instance of it.
+ * It adds persistence to Drools sessions, but it is also intertwined with
+ * active/standby state management and IntegrityMonitor. For now, they are
+ * all treated as a single feature, but it would be nice to separate them.
+ *
+ * The bulk of the code here was once in other classes, such as
+ * 'PolicyContainer' and 'Main'. It was moved here as part of making this
+ * a separate optional feature.
+ */
+
+public class StateManagementFeature implements StateManagementFeatureAPI,
+ PolicySessionFeatureAPI, PolicyEngineFeatureAPI
+{
+ // get an instance of logger
+ private static final Logger logger =
+ LoggerFactory.getLogger(StateManagementFeature.class);
+
+ private DroolsPDPIntegrityMonitor droolsPdpIntegrityMonitor = null;
+ private StateManagement stateManagement = null;
+
+ /**************************/
+ /* 'FeatureAPI' interface */
+ /**************************/
+
+ public StateManagementFeature(){
+ if(logger.isDebugEnabled()){
+ logger.debug("StateManagementFeature() constructor");
+ }
+ }
+
+ @Override
+ public void globalInit(String args[], String configDir)
+ {
+ // Initialization code associated with 'PolicyContainer'
+ if(logger.isDebugEnabled()){
+ logger.debug("StateManagementFeature.globalInit({}) entry", configDir);
+ }
+
+ try
+ {
+ droolsPdpIntegrityMonitor = DroolsPDPIntegrityMonitor.init(configDir);
+ }
+ catch (Exception e)
+ {
+ if(logger.isDebugEnabled()){
+ logger.debug("DroolsPDPIntegrityMonitor initialization exception: ", e);
+ }
+ logger.error("DroolsPDPIntegrityMonitor.init()", e);
+ }
+
+ initializeProperties(configDir);
+
+ //At this point the DroolsPDPIntegrityMonitor instance must exist. Let's check it.
+ try {
+ droolsPdpIntegrityMonitor = DroolsPDPIntegrityMonitor.getInstance();
+ stateManagement = droolsPdpIntegrityMonitor.getStateManager();
+ if(logger.isDebugEnabled()){
+ logger.debug("StateManagementFeature.globalInit(): "
+ + "stateManagement.getAdminState(): {}", stateManagement.getAdminState());
+ }
+ if(stateManagement == null){
+ if(logger.isDebugEnabled()){
+ logger.debug("StateManagementFeature.globalInit(): stateManagement is NULL!");
+ }
+ }
+ } catch (Exception e1) {
+ String msg = " \n";
+ if(logger.isDebugEnabled()){
+ logger.debug("StateManagementFeature.globalInit(): DroolsPDPIntegrityMonitor"
+ + " initialization failed with exception:", e1);
+ }
+ logger.error("DroolsPDPIntegrityMonitor.init(): StateManagementFeature startup failed "
+ + "to get DroolsPDPIntegrityMonitor instance:", e1);
+ e1.printStackTrace();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void addObserver(Observer stateChangeObserver) {
+ if(logger.isDebugEnabled()){
+ logger.debug("StateManagementFeature.addObserver() entry\n"
+ + "StateManagementFeature.addObserver(): "
+ + "stateManagement.getAdminState(): {}", stateManagement.getAdminState());
+ }
+ stateManagement.addObserver(stateChangeObserver);
+ if(logger.isDebugEnabled()){
+ logger.debug("StateManagementFeature.addObserver() exit");
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getAdminState() {
+ return stateManagement.getAdminState();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getOpState() {
+ return stateManagement.getOpState();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getAvailStatus() {
+ return stateManagement.getAvailStatus();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getStandbyStatus() {
+ return stateManagement.getStandbyStatus();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getStandbyStatus(String resourceName) {
+ return stateManagement.getStandbyStatus(resourceName);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void disableFailed(String resourceName) throws Exception {
+ stateManagement.disableFailed(resourceName);
+
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void disableFailed() throws Exception {
+ stateManagement.disableFailed();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void promote() throws StandbyStatusException, Exception {
+ stateManagement.promote();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void demote() throws Exception {
+ stateManagement.demote();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getResourceName() {
+ return StateManagementProperties.getProperty(StateManagementProperties.NODE_NAME);
+ }
+
+ /**
+ * {@inheritDoc}
+ * @return
+ */
+ @Override
+ public boolean lock(){
+ try{
+ stateManagement.lock();
+ }catch(Exception e){
+ logger.error("StateManagementFeature.lock() failed with exception: {}", e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws Exception
+ */
+ @Override
+ public boolean unlock(){
+ try{
+ stateManagement.unlock();
+ }catch(Exception e){
+ logger.error("StateManagementFeature.unlock() failed with exception: {}", e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @throws Exception
+ */
+ @Override
+ public boolean isLocked(){
+ String admin = stateManagement.getAdminState();
+ if(admin.equals(StateManagement.LOCKED)){
+ return true;
+ }else{
+ return false;
+ }
+ }
+
+ @Override
+ public int getSequenceNumber() {
+ return SEQ_NUM;
+ }
+
+ /**
+ * Read in the properties and initialize the StateManagementProperties.
+ */
+ private static void initializeProperties(String configDir)
+ {
+ //Get the state management properties
+ try {
+ Properties pIm =
+ PropertyUtil.getProperties(configDir + "/feature-state-management.properties");
+ StateManagementProperties.initProperties(pIm);
+ logger.info("initializeProperties: resourceName= {}", StateManagementProperties.getProperty(StateManagementProperties.NODE_NAME));
+ } catch (IOException e1) {
+ logger.error("initializeProperties", e1);
+ }
+ }
+}
diff --git a/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementProperties.java b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementProperties.java
new file mode 100644
index 00000000..c8e17ea9
--- /dev/null
+++ b/feature-state-management/src/main/java/org/onap/policy/drools/statemanagement/StateManagementProperties.java
@@ -0,0 +1,64 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * feature-state-management
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.statemanagement;
+
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StateManagementProperties {
+ // get an instance of logger
+ private static final Logger logger = LoggerFactory.getLogger(StateManagementProperties.class);
+
+ public static final String NODE_NAME = "resource.name";
+ public static final String SITE_NAME = "site_name";
+
+ public static final String DB_DRIVER = "javax.persistence.jdbc.driver";
+ public static final String DB_URL = "javax.persistence.jdbc.url";
+ public static final String DB_USER = "javax.persistence.jdbc.user";
+ public static final String DB_PWD = "javax.persistence.jdbc.password";
+
+ private static Properties properties = null;
+ /*
+ * Initialize the parameter values from the feature-state-management.properties file values
+ *
+ * This is designed so that the Properties object is obtained from the feature-state-management.properties
+ * file and then is passed to this method to initialize the value of the parameters.
+ * This allows the flexibility of JUnit tests using getProperties(filename) to get the
+ * properties while runtime methods can use getPropertiesFromClassPath(filename).
+ *
+ */
+ public static void initProperties (Properties prop){
+ logger.info("StateManagementProperties.initProperties(Properties): entry");
+ logger.info("\n\nStateManagementProperties.initProperties: Properties = \n{}\n\n", prop);
+
+ properties = prop;
+ }
+
+ public static String getProperty(String key){
+ return properties.getProperty(key);
+ }
+
+ public static Properties getProperties() {
+ return properties;
+ }
+}
diff --git a/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI b/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI
new file mode 100644
index 00000000..9ffef571
--- /dev/null
+++ b/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureAPI
@@ -0,0 +1 @@
+org.onap.policy.drools.statemanagement.StateManagementFeature \ No newline at end of file
diff --git a/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI b/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI
new file mode 100644
index 00000000..74d0b995
--- /dev/null
+++ b/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureAPI
@@ -0,0 +1 @@
+org.onap.policy.drools.statemanagement.StateManagementFeature
diff --git a/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.statemanagement.StateManagementFeatureAPI b/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.statemanagement.StateManagementFeatureAPI
new file mode 100644
index 00000000..74d0b995
--- /dev/null
+++ b/feature-state-management/src/main/resources/META-INF/services/org.onap.policy.drools.statemanagement.StateManagementFeatureAPI
@@ -0,0 +1 @@
+org.onap.policy.drools.statemanagement.StateManagementFeature
diff --git a/feature-state-management/src/test/java/org/onap/policy/drools/statemanagement/test/StateManagementTest.java b/feature-state-management/src/test/java/org/onap/policy/drools/statemanagement/test/StateManagementTest.java
new file mode 100644
index 00000000..e458dcea
--- /dev/null
+++ b/feature-state-management/src/test/java/org/onap/policy/drools/statemanagement/test/StateManagementTest.java
@@ -0,0 +1,245 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-persistence
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.statemanagement.test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.EntityTransaction;
+import javax.persistence.Persistence;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.onap.policy.common.im.StateManagement;
+import org.onap.policy.drools.core.PolicySessionFeatureAPI;
+import org.onap.policy.drools.statemanagement.StateManagementFeatureAPI;
+import org.onap.policy.drools.statemanagement.StateManagementProperties;
+
+public class StateManagementTest {
+
+ // get an instance of logger
+ private static Logger logger = LoggerFactory.getLogger(StateManagementTest.class);
+
+ /*
+ * Sleep 5 seconds after each test to allow interrupt (shutdown) recovery.
+ */
+
+ private long interruptRecoveryTime = 1000;
+
+ StateManagementFeatureAPI stateManagementFeature;
+
+ /*
+ * All you need to do here is create an instance of StateManagementFeature class. Then,
+ * check it initial state and the state after diableFailed() and promote()
+ */
+
+ @BeforeClass
+ public static void setUpClass() throws Exception {
+
+ logger.info("setUpClass: Entering");
+
+ String userDir = System.getProperty("user.dir");
+ logger.debug("setUpClass: userDir=" + userDir);
+ System.setProperty("com.sun.management.jmxremote.port", "9980");
+ System.setProperty("com.sun.management.jmxremote.authenticate","false");
+
+ initializeDb();
+
+ logger.info("setUpClass: Exiting");
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+
+ }
+
+ @Before
+ public void setUp() throws Exception {
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ }
+
+
+ /*
+ * Verifies that StateManagementFeature starts and runs successfully.
+ */
+
+ //@Ignore
+ @Test
+ public void testStateManagementOperation() throws Exception {
+
+ logger.debug("\n\ntestStateManagementOperation: Entering\n\n");
+
+ logger.debug("testStateManagementOperation: Reading StateManagementProperties");
+
+ String configDir = "src/test/resources";
+
+ Properties fsmProperties = new Properties();
+ fsmProperties.load(new FileInputStream(new File(
+ configDir + "/feature-state-management.properties")));
+ String thisPdpId = fsmProperties
+ .getProperty(StateManagementProperties.NODE_NAME);
+
+ StateManagementFeatureAPI stateManagementFeature = null;
+ for (StateManagementFeatureAPI feature : StateManagementFeatureAPI.impl.getList())
+ {
+ ((PolicySessionFeatureAPI) feature).globalInit(null, configDir);
+ stateManagementFeature = feature;
+ logger.debug("testStateManagementOperation stateManagementFeature.getResourceName(): " + stateManagementFeature.getResourceName());
+ break;
+ }
+ if(stateManagementFeature == null){
+ String msg = "testStateManagementOperation failed to initialize. "
+ + "Unable to get instance of StateManagementFeatureAPI "
+ + "with resourceID: " + thisPdpId;
+ logger.error(msg);
+ logger.debug(msg);
+ }
+
+ Thread.sleep(interruptRecoveryTime);
+
+ String admin = stateManagementFeature.getAdminState();
+ String oper = stateManagementFeature.getOpState();
+ String avail = stateManagementFeature.getAvailStatus();
+ String standby = stateManagementFeature.getStandbyStatus();
+
+ logger.debug("admin = {}", admin);
+ System.out.println("admin = " + admin);
+ logger.debug("oper = {}", oper);
+ System.out.println("oper = " + oper);
+ logger.debug("avail = {}", avail);
+ System.out.println("avail = " + avail);
+ logger.debug("standby = {}", standby);
+ System.out.println("standby = " + standby);
+
+ assertTrue("Admin state not unlocked after initialization", admin.equals(StateManagement.UNLOCKED));
+ assertTrue("Operational state not enabled after initialization", oper.equals(StateManagement.ENABLED));
+
+ try{
+ stateManagementFeature.disableFailed();
+ }catch(Exception e){
+ logger.error(e.getMessage());
+ System.out.println(e.getMessage());
+ assertTrue(e.getMessage(), false);
+ }
+
+ Thread.sleep(interruptRecoveryTime);
+
+ admin = stateManagementFeature.getAdminState();
+ oper = stateManagementFeature.getOpState();
+ avail = stateManagementFeature.getAvailStatus();
+ standby = stateManagementFeature.getStandbyStatus();
+
+ logger.debug("after disableFailed()");
+ System.out.println("after disableFailed()");
+ logger.debug("admin = {}", admin);
+ System.out.println("admin = " + admin);
+ logger.debug("oper = {}", oper);
+ System.out.println("oper = " + oper);
+ logger.debug("avail = {}", avail);
+ System.out.println("avail = " + avail);
+ logger.debug("standby = {}", standby);
+ System.out.println("standby = " + standby);
+
+ assertTrue("Operational state not disabled after disableFailed()", oper.equals(StateManagement.DISABLED));
+ assertTrue("Availability status not failed after disableFailed()", avail.equals(StateManagement.FAILED));
+
+
+ try{
+ stateManagementFeature.promote();
+ }catch(Exception e){
+ logger.debug(e.getMessage());
+ System.out.println(e.getMessage());
+ }
+
+ Thread.sleep(interruptRecoveryTime);
+
+ admin = stateManagementFeature.getAdminState();
+ oper = stateManagementFeature.getOpState();
+ avail = stateManagementFeature.getAvailStatus();
+ standby = stateManagementFeature.getStandbyStatus();
+
+ logger.debug("after promote()");
+ System.out.println("after promote()");
+ logger.debug("admin = {}", admin);
+ System.out.println("admin = " + admin);
+ logger.debug("oper = {}", oper);
+ System.out.println("oper = " + oper);
+ logger.debug("avail = {}", avail);
+ System.out.println("avail = " + avail);
+ logger.debug("standby = {}", standby);
+ System.out.println("standby = " + standby);
+
+ assertTrue("Standby status not coldstandby after promote()", standby.equals(StateManagement.COLD_STANDBY));
+
+ logger.debug("\n\ntestStateManagementOperation: Exiting\n\n");
+ }
+
+ /*
+ * This method initializes and cleans the DB so that PDP-D will be able to
+ * store fresh records in the DB.
+ */
+
+ public static void initializeDb(){
+
+ logger.debug("initializeDb: Entering");
+
+ Properties cleanProperties = new Properties();
+ cleanProperties.put(StateManagementProperties.DB_DRIVER,"org.h2.Driver");
+ cleanProperties.put(StateManagementProperties.DB_URL, "jdbc:h2:file:./sql/statemanagement");
+ cleanProperties.put(StateManagementProperties.DB_USER, "sa");
+ cleanProperties.put(StateManagementProperties.DB_PWD, "");
+
+ EntityManagerFactory emf = Persistence.createEntityManagerFactory("junitPU", cleanProperties);
+
+ EntityManager em = emf.createEntityManager();
+ // Start a transaction
+ EntityTransaction et = em.getTransaction();
+
+ et.begin();
+
+ // Clean up the DB
+ em.createQuery("Delete from StateManagementEntity").executeUpdate();
+ em.createQuery("Delete from ForwardProgressEntity").executeUpdate();
+ em.createQuery("Delete from ResourceRegistrationEntity").executeUpdate();
+
+ // commit transaction
+ et.commit();
+ em.close();
+
+ logger.debug("initializeDb: Exiting");
+ }
+}
diff --git a/feature-state-management/src/test/resources/META-INF/persistence.xml b/feature-state-management/src/test/resources/META-INF/persistence.xml
new file mode 100644
index 00000000..d26ab443
--- /dev/null
+++ b/feature-state-management/src/test/resources/META-INF/persistence.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ============LICENSE_START=======================================================
+ feature-state-management
+ ================================================================================
+ Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ============LICENSE_END=========================================================
+ -->
+
+<persistence version="2.1"
+ xmlns="http://xmlns.jcp.org/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/persistence http://xmlns.jcp.org/xml/ns/persistence/persistence_2_1.xsd">
+
+ <persistence-unit name="junitPU" transaction-type="RESOURCE_LOCAL">
+ <provider>org.eclipse.persistence.jpa.PersistenceProvider</provider>
+ <class>org.onap.policy.common.im.jpa.StateManagementEntity</class>
+ <class>org.onap.policy.common.im.jpa.ForwardProgressEntity</class>
+ <class>org.onap.policy.common.im.jpa.ResourceRegistrationEntity</class>
+ <properties>
+ <property name="javax.persistence.schema-generation.database.action" value="drop-and-create"/>
+ <property name="javax.persistence.schema-generation.scripts.action" value="drop-and-create"/>
+ <property name="javax.persistence.schema-generation.scripts.create-target" value="./sql/generatedCreateStateManagement.ddl"/>
+ <property name="javax.persistence.schema-generation.scripts.drop-target" value="./sql/generatedDropStateManagement.ddl"/>
+ </properties>
+ </persistence-unit>
+
+</persistence>
diff --git a/feature-state-management/src/test/resources/feature-state-management.properties b/feature-state-management/src/test/resources/feature-state-management.properties
new file mode 100644
index 00000000..7b4a697e
--- /dev/null
+++ b/feature-state-management/src/test/resources/feature-state-management.properties
@@ -0,0 +1,74 @@
+###
+# ============LICENSE_START=======================================================
+# feature-state-management
+# ================================================================================
+# Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+# DB properties
+javax.persistence.jdbc.driver = org.h2.Driver
+javax.persistence.jdbc.url = jdbc:h2:file:./sql/statemanagement
+javax.persistence.jdbc.user = sa
+javax.persistence.jdbc.password =
+
+# DroolsPDPIntegrityMonitor Properties
+hostPort = 0.0.0.0:57692
+
+#IntegrityMonitor Properties
+
+# Must be unique across the system
+resource.name=pdp1
+# Name of the site in which this node is hosted
+site_name = pdp_1
+# Forward Progress Monitor update interval seconds
+fp_monitor_interval = 30
+# Failed counter threshold before failover
+failed_counter_threshold = 3
+# Interval between test transactions when no traffic seconds
+test_trans_interval = 10
+# Interval between writes of the FPC to the DB seconds
+write_fpc_interval = 5
+# Node type Note: Make sure you don't leave any trailing spaces, or you'll get an 'invalid node type' error!
+node_type = pdp_drools
+# Dependency groups are groups of resources upon which a node operational state is dependent upon.
+# Each group is a comma-separated list of resource names and groups are separated by a semicolon. For example:
+# dependency_groups=site_1.astra_1,site_1.astra_2;site_1.brms_1,site_1.brms_2;site_1.logparser_1;site_1.pypdp_1
+dependency_groups=
+# When set to true, dependent health checks are performed by using JMX to invoke test() on the dependent.
+# The default false is to use state checks for health.
+test_via_jmx=true
+# This is the max number of seconds beyond which a non incrementing FPC is considered a failure
+max_fpc_update_interval=120
+# Run the state audit every 60 seconds (60000 ms). The state audit finds stale DB entries in the
+# forwardprogressentity table and marks the node as disabled/failed in the statemanagemententity
+# table. NOTE! It will only run on nodes that have a standbystatus = providingservice.
+# A value of <= 0 will turn off the state audit.
+state_audit_interval_ms=60000
+# The refresh state audit is run every (default) 10 minutes (600000 ms) to clean up any state corruption in the
+# DB statemanagemententity table. It only refreshes the DB state entry for the local node. That is, it does not
+# refresh the state of any other nodes. A value <= 0 will turn the audit off. Any other value will override
+# the default of 600000 ms.
+refresh_state_audit_interval_ms=600000
+
+
+# Repository audit properties
+# Flag to control the execution of the subsystemTest for the Nexus Maven repository
+repository.audit.is.active=false
+repository.audit.ignore.errors=true
+
+# DB Audit Properties
+# Flag to control the execution of the subsystemTest for the Database
+db.audit.is.active=false
diff --git a/feature-state-management/src/test/resources/logback-test.xml b/feature-state-management/src/test/resources/logback-test.xml
new file mode 100644
index 00000000..58cabf98
--- /dev/null
+++ b/feature-state-management/src/test/resources/logback-test.xml
@@ -0,0 +1,47 @@
+<!--
+ ============LICENSE_START=======================================================
+ feature-state-management
+ ================================================================================
+ Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ============LICENSE_END=========================================================
+ -->
+
+<!-- Controls the output of logs for JUnit tests -->
+
+<configuration>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+ <Pattern>
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}.%M\(%line\) - %msg%n
+ </Pattern>
+ </encoder>
+ </appender>
+ <appender name="FILE" class="ch.qos.logback.core.FileAppender">
+ <file>logs/debug.log</file>
+ <encoder>
+ <Pattern>
+ %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36}.%M\(%line\) - %msg%n
+ </Pattern>
+ </encoder>
+ </appender>
+
+ <root level="debug">
+ <appender-ref ref="STDOUT" />
+ <appender-ref ref="FILE" />
+ </root>
+
+</configuration>
+
diff --git a/packages/install/pom.xml b/packages/install/pom.xml
index 2b3a136d..7a765c3a 100644
--- a/packages/install/pom.xml
+++ b/packages/install/pom.xml
@@ -95,6 +95,12 @@
<version>${project.version}</version>
<type>zip</type>
</dependency>
+ <dependency>
+ <groupId>org.onap.policy.drools-pdp</groupId>
+ <artifactId>feature-state-management</artifactId>
+ <version>${project.version}</version>
+ <type>zip</type>
+ </dependency>
</dependencies>
</project>
diff --git a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java
index cf94bfcb..9fc2c837 100644
--- a/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java
+++ b/policy-core/src/main/java/org/onap/policy/drools/core/PolicyContainer.java
@@ -457,10 +457,19 @@ public class PolicyContainer implements Startable
logger.info("updateToVersion:releaseId " + releaseId.toString());
}
- // notify all 'PolicySession' instances
+ // stop all session threads
+ for (PolicySession session : sessions.values())
+ {
+ session.stopThread();
+ }
+
+ // update the version
Results results = kieContainer.updateToVersion(releaseId);
+
+ // restart all session threads, and notify the sessions
for (PolicySession session : sessions.values())
{
+ session.startThread();
session.updated();
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java
index cc3705ee..09ee9a4e 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/TopicEndpoint.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -29,214 +29,217 @@ import org.onap.policy.drools.event.comm.bus.DmaapTopicSource;
import org.onap.policy.drools.event.comm.bus.NoopTopicSink;
import org.onap.policy.drools.event.comm.bus.UebTopicSink;
import org.onap.policy.drools.event.comm.bus.UebTopicSource;
-import org.slf4j.LoggerFactory;
-import org.slf4j.Logger;
import org.onap.policy.drools.properties.Lockable;
import org.onap.policy.drools.properties.Startable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
- * Abstraction to managed the system's Networked Topic Endpoints,
- * sources of all events input into the System.
+ * Abstraction to managed the system's Networked Topic Endpoints, sources of all events input into
+ * the System.
*/
public interface TopicEndpoint extends Startable, Lockable {
-
- /**
- * Add Topic Sources to the communication infrastructure initialized per
- * properties
- *
- * @param properties properties for Topic Source construction
- * @return a generic Topic Source
- * @throws IllegalArgumentException when invalid arguments are provided
- */
- public List<TopicSource> addTopicSources(Properties properties);
-
- /**
- * Add Topic Sinks to the communication infrastructure initialized per
- * properties
- *
- * @param properties properties for Topic Sink construction
- * @return a generic Topic Sink
- * @throws IllegalArgumentException when invalid arguments are provided
- */
- public List<TopicSink> addTopicSinks(Properties properties);
-
- /**
- * gets all Topic Sources
- * @return the Topic Source List
- */
- List<TopicSource> getTopicSources();
-
- /**
- * get the Topic Sources for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the Topic Source List
- * @throws IllegalStateException if the entity is in an invalid state
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<TopicSource> getTopicSources(List<String> topicNames);
-
- /**
- * gets the Topic Source for the given topic name and
- * underlying communication infrastructure type
- *
- * @param commType communication infrastructure type
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- * @throws UnsupportedOperationException if the operation is not supported.
- */
- public TopicSource getTopicSource(Topic.CommInfrastructure commType,
- String topicName)
- throws UnsupportedOperationException;
-
- /**
- * get the UEB Topic Source for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the UEB Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public UebTopicSource getUebTopicSource(String topicName);
-
- /**
- * get the DMAAP Topic Source for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the DMAAP Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSource getDmaapTopicSource(String topicName);
-
- /**
- * get the Topic Sinks for the given topic name
- *
- * @param topicNames the topic names
- * @return the Topic Sink List
- * @throws IllegalStateException
- * @throws IllegalArgumentException
- */
- public List<TopicSink> getTopicSinks(List<String> topicNames);
-
- /**
- * get the Topic Sinks for the given topic name and
- * underlying communication infrastructure type
- *
- * @param topicName the topic name
- * @param commType communication infrastructure type
- *
- * @return the Topic Sink List
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicWriters for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public TopicSink getTopicSink(Topic.CommInfrastructure commType,
- String topicName)
- throws UnsupportedOperationException;
-
- /**
- * get the Topic Sinks for the given topic name and
- * all the underlying communication infrastructure type
- *
- * @param topicName the topic name
- * @param commType communication infrastructure type
- *
- * @return the Topic Sink List
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicWriters for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<TopicSink> getTopicSinks(String topicName);
-
- /**
- * get the UEB Topic Source for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public UebTopicSink getUebTopicSink(String topicName);
-
- /**
- * get the no-op Topic Sink for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public NoopTopicSink getNoopTopicSink(String topicName);
-
- /**
- * get the DMAAP Topic Source for the given topic name
- *
- * @param topicName the topic name
- *
- * @return the Topic Source
- * @throws IllegalStateException if the entity is in an invalid state, for
- * example multiple TopicReaders for a topic name and communication infrastructure
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public DmaapTopicSink getDmaapTopicSink(String topicName);
-
- /**
- * gets only the UEB Topic Sources
- * @return the UEB Topic Source List
- */
- public List<UebTopicSource> getUebTopicSources();
-
- /**
- * gets only the DMAAP Topic Sources
- * @return the DMAAP Topic Source List
- */
- public List<DmaapTopicSource> getDmaapTopicSources();
-
- /**
- * gets all Topic Sinks
- * @return the Topic Sink List
- */
- public List<TopicSink> getTopicSinks();
-
- /**
- * gets only the UEB Topic Sinks
- * @return the UEB Topic Sink List
- */
- public List<UebTopicSink> getUebTopicSinks();
-
- /**
- * gets only the DMAAP Topic Sinks
- * @return the DMAAP Topic Sink List
- */
- public List<DmaapTopicSink> getDmaapTopicSinks();
-
- /**
- * gets only the NOOP Topic Sinks
- * @return the NOOP Topic Sinks List
- */
- public List<NoopTopicSink> getNoopTopicSinks();
-
- /**
- * singleton for global access
- */
- public static final TopicEndpoint manager = new ProxyTopicEndpointManager();
+
+ /**
+ * Add Topic Sources to the communication infrastructure initialized per properties
+ *
+ * @param properties properties for Topic Source construction
+ * @return a generic Topic Source
+ * @throws IllegalArgumentException when invalid arguments are provided
+ */
+ public List<TopicSource> addTopicSources(Properties properties);
+
+ /**
+ * Add Topic Sinks to the communication infrastructure initialized per properties
+ *
+ * @param properties properties for Topic Sink construction
+ * @return a generic Topic Sink
+ * @throws IllegalArgumentException when invalid arguments are provided
+ */
+ public List<TopicSink> addTopicSinks(Properties properties);
+
+ /**
+ * gets all Topic Sources
+ *
+ * @return the Topic Source List
+ */
+ List<TopicSource> getTopicSources();
+
+ /**
+ * get the Topic Sources for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the Topic Source List
+ * @throws IllegalStateException if the entity is in an invalid state
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public List<TopicSource> getTopicSources(List<String> topicNames);
+
+ /**
+ * gets the Topic Source for the given topic name and underlying communication infrastructure type
+ *
+ * @param commType communication infrastructure type
+ * @param topicName the topic name
+ *
+ * @return the Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ * @throws UnsupportedOperationException if the operation is not supported.
+ */
+ public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName)
+ throws UnsupportedOperationException;
+
+ /**
+ * get the UEB Topic Source for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the UEB Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public UebTopicSource getUebTopicSource(String topicName);
+
+ /**
+ * get the DMAAP Topic Source for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the DMAAP Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public DmaapTopicSource getDmaapTopicSource(String topicName);
+
+ /**
+ * get the Topic Sinks for the given topic name
+ *
+ * @param topicNames the topic names
+ * @return the Topic Sink List
+ * @throws IllegalStateException
+ * @throws IllegalArgumentException
+ */
+ public List<TopicSink> getTopicSinks(List<String> topicNames);
+
+ /**
+ * get the Topic Sinks for the given topic name and underlying communication infrastructure type
+ *
+ * @param topicName the topic name
+ * @param commType communication infrastructure type
+ *
+ * @return the Topic Sink List
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicWriters for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName)
+ throws UnsupportedOperationException;
+
+ /**
+ * get the Topic Sinks for the given topic name and all the underlying communication
+ * infrastructure type
+ *
+ * @param topicName the topic name
+ * @param commType communication infrastructure type
+ *
+ * @return the Topic Sink List
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicWriters for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public List<TopicSink> getTopicSinks(String topicName);
+
+ /**
+ * get the UEB Topic Source for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public UebTopicSink getUebTopicSink(String topicName);
+
+ /**
+ * get the no-op Topic Sink for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public NoopTopicSink getNoopTopicSink(String topicName);
+
+ /**
+ * get the DMAAP Topic Source for the given topic name
+ *
+ * @param topicName the topic name
+ *
+ * @return the Topic Source
+ * @throws IllegalStateException if the entity is in an invalid state, for example multiple
+ * TopicReaders for a topic name and communication infrastructure
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public DmaapTopicSink getDmaapTopicSink(String topicName);
+
+ /**
+ * gets only the UEB Topic Sources
+ *
+ * @return the UEB Topic Source List
+ */
+ public List<UebTopicSource> getUebTopicSources();
+
+ /**
+ * gets only the DMAAP Topic Sources
+ *
+ * @return the DMAAP Topic Source List
+ */
+ public List<DmaapTopicSource> getDmaapTopicSources();
+
+ /**
+ * gets all Topic Sinks
+ *
+ * @return the Topic Sink List
+ */
+ public List<TopicSink> getTopicSinks();
+
+ /**
+ * gets only the UEB Topic Sinks
+ *
+ * @return the UEB Topic Sink List
+ */
+ public List<UebTopicSink> getUebTopicSinks();
+
+ /**
+ * gets only the DMAAP Topic Sinks
+ *
+ * @return the DMAAP Topic Sink List
+ */
+ public List<DmaapTopicSink> getDmaapTopicSinks();
+
+ /**
+ * gets only the NOOP Topic Sinks
+ *
+ * @return the NOOP Topic Sinks List
+ */
+ public List<NoopTopicSink> getNoopTopicSinks();
+
+ /**
+ * singleton for global access
+ */
+ public static final TopicEndpoint manager = new ProxyTopicEndpointManager();
}
+
/*
* ----------------- implementation -------------------
*/
@@ -246,398 +249,412 @@ public interface TopicEndpoint extends Startable, Lockable {
* implementations according to the communication infrastructure that are supported
*/
class ProxyTopicEndpointManager implements TopicEndpoint {
- /**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class);
- /**
- * Is this element locked?
- */
- protected volatile boolean locked = false;
-
- /**
- * Is this element alive?
- */
- protected volatile boolean alive = false;
-
- @Override
- public List<TopicSource> addTopicSources(Properties properties) {
-
- // 1. Create UEB Sources
- // 2. Create DMAAP Sources
-
- List<TopicSource> sources = new ArrayList<>();
-
- sources.addAll(UebTopicSource.factory.build(properties));
- sources.addAll(DmaapTopicSource.factory.build(properties));
-
- if (this.isLocked()) {
- for (TopicSource source : sources) {
- source.lock();
- }
- }
-
- return sources;
- }
-
- @Override
- public List<TopicSink> addTopicSinks(Properties properties) {
- // 1. Create UEB Sinks
- // 2. Create DMAAP Sinks
-
- List<TopicSink> sinks = new ArrayList<>();
-
- sinks.addAll(UebTopicSink.factory.build(properties));
- sinks.addAll(DmaapTopicSink.factory.build(properties));
- sinks.addAll(NoopTopicSink.factory.build(properties));
-
- if (this.isLocked()) {
- for (TopicSink sink : sinks) {
- sink.lock();
- }
- }
-
- return sinks;
- }
-
- @Override
- public List<TopicSource> getTopicSources() {
-
- List<TopicSource> sources = new ArrayList<>();
-
- sources.addAll(UebTopicSource.factory.inventory());
- sources.addAll(DmaapTopicSource.factory.inventory());
-
- return sources;
- }
-
- @Override
- public List<TopicSink> getTopicSinks() {
-
- List<TopicSink> sinks = new ArrayList<>();
-
- sinks.addAll(UebTopicSink.factory.inventory());
- sinks.addAll(DmaapTopicSink.factory.inventory());
- sinks.addAll(NoopTopicSink.factory.inventory());
-
- return sinks;
- }
-
- @JsonIgnore
- @Override
- public List<UebTopicSource> getUebTopicSources() {
- return UebTopicSource.factory.inventory();
- }
-
- @JsonIgnore
- @Override
- public List<DmaapTopicSource> getDmaapTopicSources() {
- return DmaapTopicSource.factory.inventory();
- }
-
- @JsonIgnore
- @Override
- public List<UebTopicSink> getUebTopicSinks() {
- return UebTopicSink.factory.inventory();
- }
-
- @JsonIgnore
- @Override
- public List<DmaapTopicSink> getDmaapTopicSinks() {
- return DmaapTopicSink.factory.inventory();
- }
-
- @JsonIgnore
- @Override
- public List<NoopTopicSink> getNoopTopicSinks() {
- return NoopTopicSink.factory.inventory();
- }
-
- @Override
- public boolean start() {
-
- synchronized (this) {
- if (this.locked) {
- throw new IllegalStateException(this + " is locked");
- }
-
- if (this.alive) {
- return true;
- }
-
- this.alive = true;
- }
-
- List<Startable> endpoints = getEndpoints();
-
- boolean success = true;
- for (Startable endpoint: endpoints) {
- try {
- success = endpoint.start() && success;
- } catch (Exception e) {
- success = false;
- logger.error("Problem starting endpoint: {}", endpoint, e);
- }
- }
-
- return success;
- }
-
-
- @Override
- public boolean stop() {
-
- /*
- * stop regardless if it is locked, in other
- * words, stop operation has precedence over
- * locks.
- */
- synchronized (this) {
- this.alive = false;
- }
-
- List<Startable> endpoints = getEndpoints();
-
- boolean success = true;
- for (Startable endpoint: endpoints) {
- try {
- success = endpoint.stop() && success;
- } catch (Exception e) {
- success = false;
- logger.error("Problem stopping endpoint: {}", endpoint, e);
- }
- }
-
- return success;
- }
-
- /**
- *
- * @return list of managed endpoints
- */
- @JsonIgnore
- protected List<Startable> getEndpoints() {
- List<Startable> endpoints = new ArrayList<>();
-
- endpoints.addAll(this.getTopicSources());
- endpoints.addAll(this.getTopicSinks());
-
- return endpoints;
- }
-
- @Override
- public void shutdown() {
- UebTopicSource.factory.destroy();
- UebTopicSink.factory.destroy();
-
- DmaapTopicSource.factory.destroy();
- DmaapTopicSink.factory.destroy();
- }
-
- @Override
- public boolean isAlive() {
- return this.alive;
- }
-
- @Override
- public boolean lock() {
-
- synchronized (this) {
- if (locked)
- return true;
-
- this.locked = true;
- }
-
- for (TopicSource source: this.getTopicSources()) {
- source.lock();
- }
-
- for (TopicSink sink: this.getTopicSinks()) {
- sink.lock();
- }
-
- return true;
- }
-
- @Override
- public boolean unlock() {
- synchronized (this) {
- if (!locked)
- return true;
-
- this.locked = false;
- }
-
- for (TopicSource source: this.getTopicSources()) {
- source.unlock();
- }
-
- for (TopicSink sink: this.getTopicSinks()) {
- sink.unlock();
- }
-
- return true;
- }
-
- @Override
- public boolean isLocked() {
- return this.locked;
- }
-
- @Override
- public List<TopicSource> getTopicSources(List<String> topicNames) {
-
- if (topicNames == null) {
- throw new IllegalArgumentException("must provide a list of topics");
- }
-
- List<TopicSource> sources = new ArrayList<>();
- for (String topic: topicNames) {
- try {
- TopicSource uebSource = this.getUebTopicSource(topic);
- if (uebSource != null)
- sources.add(uebSource);
- } catch (Exception e) {
- logger.info("No UEB source for topic: {}", topic, e);
- }
-
- try {
- TopicSource dmaapSource = this.getDmaapTopicSource(topic);
- if (dmaapSource != null)
- sources.add(dmaapSource);
- } catch (Exception e) {
- logger.info("No DMAAP source for topic: {}", topic, e);
- }
- }
- return sources;
- }
-
- @Override
- public List<TopicSink> getTopicSinks(List<String> topicNames) {
-
- if (topicNames == null) {
- throw new IllegalArgumentException("must provide a list of topics");
- }
-
- List<TopicSink> sinks = new ArrayList<>();
- for (String topic: topicNames) {
- try {
- TopicSink uebSink = this.getUebTopicSink(topic);
- if (uebSink != null)
- sinks.add(uebSink);
- } catch (Exception e) {
- logger.info("No UEB sink for topic: {}", topic, e);
- }
-
- try {
- TopicSink dmaapSink = this.getDmaapTopicSink(topic);
- if (dmaapSink != null)
- sinks.add(dmaapSink);
- } catch (Exception e) {
- logger.info("No DMAAP sink for topic: {}", topic, e);
- }
- }
- return sinks;
- }
-
- @Override
- public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName)
- throws UnsupportedOperationException {
-
- if (commType == null) {
- throw new IllegalArgumentException
- ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
- }
-
- if (topicName == null) {
- throw new IllegalArgumentException
- ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
- }
-
- switch (commType) {
- case UEB:
- return this.getUebTopicSource(topicName);
- case DMAAP:
- return this.getDmaapTopicSource(topicName);
- case REST:
- default:
- throw new UnsupportedOperationException("Unsupported " + commType.name());
- }
- }
-
- @Override
- public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName)
- throws UnsupportedOperationException {
- if (commType == null) {
- throw new IllegalArgumentException
- ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
- }
-
- if (topicName == null) {
- throw new IllegalArgumentException
- ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
- }
-
- switch (commType) {
- case UEB:
- return this.getUebTopicSink(topicName);
- case DMAAP:
- return this.getDmaapTopicSink(topicName);
- case REST:
- default:
- throw new UnsupportedOperationException("Unsupported " + commType.name());
- }
- }
-
- @Override
- public List<TopicSink> getTopicSinks(String topicName) {
-
- if (topicName == null) {
- throw new IllegalArgumentException
- ("Invalid parameter: a communication infrastructure required to fetch " + topicName);
- }
-
- List<TopicSink> sinks = new ArrayList<>();
-
- try {
- sinks.add(this.getUebTopicSink(topicName));
- } catch (Exception e) {
- logger.debug("No sink for topic: {}", topicName, e);
- }
-
- try {
- sinks.add(this.getDmaapTopicSink(topicName));
- } catch (Exception e) {
- logger.debug("No sink for topic: {}", topicName, e);
- }
-
- return sinks;
- }
-
- @Override
- public UebTopicSource getUebTopicSource(String topicName) {
- return UebTopicSource.factory.get(topicName);
- }
-
- @Override
- public UebTopicSink getUebTopicSink(String topicName) {
- return UebTopicSink.factory.get(topicName);
- }
-
- @Override
- public DmaapTopicSource getDmaapTopicSource(String topicName) {
- return DmaapTopicSource.factory.get(topicName);
- }
-
- @Override
- public DmaapTopicSink getDmaapTopicSink(String topicName) {
- return DmaapTopicSink.factory.get(topicName);
- }
-
- @Override
- public NoopTopicSink getNoopTopicSink(String topicName) {
- return NoopTopicSink.factory.get(topicName);
- }
-
+ /**
+ * Logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(ProxyTopicEndpointManager.class);
+ /**
+ * Is this element locked?
+ */
+ protected volatile boolean locked = false;
+
+ /**
+ * Is this element alive?
+ */
+ protected volatile boolean alive = false;
+
+ @Override
+ public List<TopicSource> addTopicSources(Properties properties) {
+
+ // 1. Create UEB Sources
+ // 2. Create DMAAP Sources
+
+ final List<TopicSource> sources = new ArrayList<>();
+
+ sources.addAll(UebTopicSource.factory.build(properties));
+ sources.addAll(DmaapTopicSource.factory.build(properties));
+
+ if (this.isLocked()) {
+ for (final TopicSource source : sources) {
+ source.lock();
+ }
+ }
+
+ return sources;
+ }
+
+ @Override
+ public List<TopicSink> addTopicSinks(Properties properties) {
+ // 1. Create UEB Sinks
+ // 2. Create DMAAP Sinks
+
+ final List<TopicSink> sinks = new ArrayList<>();
+
+ sinks.addAll(UebTopicSink.factory.build(properties));
+ sinks.addAll(DmaapTopicSink.factory.build(properties));
+ sinks.addAll(NoopTopicSink.factory.build(properties));
+
+ if (this.isLocked()) {
+ for (final TopicSink sink : sinks) {
+ sink.lock();
+ }
+ }
+
+ return sinks;
+ }
+
+ @Override
+ public List<TopicSource> getTopicSources() {
+
+ final List<TopicSource> sources = new ArrayList<>();
+
+ sources.addAll(UebTopicSource.factory.inventory());
+ sources.addAll(DmaapTopicSource.factory.inventory());
+
+ return sources;
+ }
+
+ @Override
+ public List<TopicSink> getTopicSinks() {
+
+ final List<TopicSink> sinks = new ArrayList<>();
+
+ sinks.addAll(UebTopicSink.factory.inventory());
+ sinks.addAll(DmaapTopicSink.factory.inventory());
+ sinks.addAll(NoopTopicSink.factory.inventory());
+
+ return sinks;
+ }
+
+ @JsonIgnore
+ @Override
+ public List<UebTopicSource> getUebTopicSources() {
+ return UebTopicSource.factory.inventory();
+ }
+
+ @JsonIgnore
+ @Override
+ public List<DmaapTopicSource> getDmaapTopicSources() {
+ return DmaapTopicSource.factory.inventory();
+ }
+
+ @JsonIgnore
+ @Override
+ public List<UebTopicSink> getUebTopicSinks() {
+ return UebTopicSink.factory.inventory();
+ }
+
+ @JsonIgnore
+ @Override
+ public List<DmaapTopicSink> getDmaapTopicSinks() {
+ return DmaapTopicSink.factory.inventory();
+ }
+
+ @JsonIgnore
+ @Override
+ public List<NoopTopicSink> getNoopTopicSinks() {
+ return NoopTopicSink.factory.inventory();
+ }
+
+ @Override
+ public boolean start() {
+
+ synchronized (this) {
+ if (this.locked) {
+ throw new IllegalStateException(this + " is locked");
+ }
+
+ if (this.alive) {
+ return true;
+ }
+
+ this.alive = true;
+ }
+
+ final List<Startable> endpoints = this.getEndpoints();
+
+ boolean success = true;
+ for (final Startable endpoint : endpoints) {
+ try {
+ success = endpoint.start() && success;
+ } catch (final Exception e) {
+ success = false;
+ logger.error("Problem starting endpoint: {}", endpoint, e);
+ }
+ }
+
+ return success;
+ }
+
+
+ @Override
+ public boolean stop() {
+
+ /*
+ * stop regardless if it is locked, in other words, stop operation has precedence over locks.
+ */
+ synchronized (this) {
+ this.alive = false;
+ }
+
+ final List<Startable> endpoints = this.getEndpoints();
+
+ boolean success = true;
+ for (final Startable endpoint : endpoints) {
+ try {
+ success = endpoint.stop() && success;
+ } catch (final Exception e) {
+ success = false;
+ logger.error("Problem stopping endpoint: {}", endpoint, e);
+ }
+ }
+
+ return success;
+ }
+
+ /**
+ *
+ * @return list of managed endpoints
+ */
+ @JsonIgnore
+ protected List<Startable> getEndpoints() {
+ final List<Startable> endpoints = new ArrayList<>();
+
+ endpoints.addAll(this.getTopicSources());
+ endpoints.addAll(this.getTopicSinks());
+
+ return endpoints;
+ }
+
+ @Override
+ public void shutdown() {
+ UebTopicSource.factory.destroy();
+ UebTopicSink.factory.destroy();
+ NoopTopicSink.factory.destroy();
+
+ DmaapTopicSource.factory.destroy();
+ DmaapTopicSink.factory.destroy();
+ }
+
+ @Override
+ public boolean isAlive() {
+ return this.alive;
+ }
+
+ @Override
+ public boolean lock() {
+
+ synchronized (this) {
+ if (this.locked)
+ return true;
+
+ this.locked = true;
+ }
+
+ for (final TopicSource source : this.getTopicSources()) {
+ source.lock();
+ }
+
+ for (final TopicSink sink : this.getTopicSinks()) {
+ sink.lock();
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean unlock() {
+ synchronized (this) {
+ if (!this.locked)
+ return true;
+
+ this.locked = false;
+ }
+
+ for (final TopicSource source : this.getTopicSources()) {
+ source.unlock();
+ }
+
+ for (final TopicSink sink : this.getTopicSinks()) {
+ sink.unlock();
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean isLocked() {
+ return this.locked;
+ }
+
+ @Override
+ public List<TopicSource> getTopicSources(List<String> topicNames) {
+
+ if (topicNames == null) {
+ throw new IllegalArgumentException("must provide a list of topics");
+ }
+
+ final List<TopicSource> sources = new ArrayList<>();
+ for (final String topic : topicNames) {
+ try {
+ final TopicSource uebSource = this.getUebTopicSource(topic);
+ if (uebSource != null)
+ sources.add(uebSource);
+ } catch (final Exception e) {
+ logger.debug("No UEB source for topic: {}", topic, e);
+ }
+
+ try {
+ final TopicSource dmaapSource = this.getDmaapTopicSource(topic);
+ if (dmaapSource != null)
+ sources.add(dmaapSource);
+ } catch (final Exception e) {
+ logger.debug("No DMAAP source for topic: {}", topic, e);
+ }
+ }
+ return sources;
+ }
+
+ @Override
+ public List<TopicSink> getTopicSinks(List<String> topicNames) {
+
+ if (topicNames == null) {
+ throw new IllegalArgumentException("must provide a list of topics");
+ }
+
+ final List<TopicSink> sinks = new ArrayList<>();
+ for (final String topic : topicNames) {
+ try {
+ final TopicSink uebSink = this.getUebTopicSink(topic);
+ if (uebSink != null)
+ sinks.add(uebSink);
+ } catch (final Exception e) {
+ logger.debug("No UEB sink for topic: {}", topic, e);
+ }
+
+ try {
+ final TopicSink dmaapSink = this.getDmaapTopicSink(topic);
+ if (dmaapSink != null)
+ sinks.add(dmaapSink);
+ } catch (final Exception e) {
+ logger.debug("No DMAAP sink for topic: {}", topic, e);
+ }
+
+ try {
+ final TopicSink noopSink = this.getNoopTopicSink(topic);
+ if (noopSink != null)
+ sinks.add(noopSink);
+ } catch (final Exception e) {
+ logger.debug("No NOOP sink for topic: {}", topic, e);
+ }
+ }
+ return sinks;
+ }
+
+ @Override
+ public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName)
+ throws UnsupportedOperationException {
+
+ if (commType == null) {
+ throw new IllegalArgumentException(
+ "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ if (topicName == null) {
+ throw new IllegalArgumentException(
+ "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ switch (commType) {
+ case UEB:
+ return this.getUebTopicSource(topicName);
+ case DMAAP:
+ return this.getDmaapTopicSource(topicName);
+ case REST:
+ default:
+ throw new UnsupportedOperationException("Unsupported " + commType.name());
+ }
+ }
+
+ @Override
+ public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName)
+ throws UnsupportedOperationException {
+ if (commType == null) {
+ throw new IllegalArgumentException(
+ "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ if (topicName == null) {
+ throw new IllegalArgumentException(
+ "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ switch (commType) {
+ case UEB:
+ return this.getUebTopicSink(topicName);
+ case DMAAP:
+ return this.getDmaapTopicSink(topicName);
+ case NOOP:
+ return this.getNoopTopicSink(topicName);
+ case REST:
+ default:
+ throw new UnsupportedOperationException("Unsupported " + commType.name());
+ }
+ }
+
+ @Override
+ public List<TopicSink> getTopicSinks(String topicName) {
+ if (topicName == null) {
+ throw new IllegalArgumentException(
+ "Invalid parameter: a communication infrastructure required to fetch " + topicName);
+ }
+
+ final List<TopicSink> sinks = new ArrayList<>();
+
+ try {
+ sinks.add(this.getUebTopicSink(topicName));
+ } catch (final Exception e) {
+ logger.debug("No sink for topic: {}", topicName, e);
+ }
+
+ try {
+ sinks.add(this.getDmaapTopicSink(topicName));
+ } catch (final Exception e) {
+ logger.debug("No sink for topic: {}", topicName, e);
+ }
+
+ try {
+ sinks.add(this.getNoopTopicSink(topicName));
+ } catch (final Exception e) {
+ logger.debug("No sink for topic: {}", topicName, e);
+ }
+
+ return sinks;
+ }
+
+ @Override
+ public UebTopicSource getUebTopicSource(String topicName) {
+ return UebTopicSource.factory.get(topicName);
+ }
+
+ @Override
+ public UebTopicSink getUebTopicSink(String topicName) {
+ return UebTopicSink.factory.get(topicName);
+ }
+
+ @Override
+ public DmaapTopicSource getDmaapTopicSource(String topicName) {
+ return DmaapTopicSource.factory.get(topicName);
+ }
+
+ @Override
+ public DmaapTopicSink getDmaapTopicSink(String topicName) {
+ return DmaapTopicSink.factory.get(topicName);
+ }
+
+ @Override
+ public NoopTopicSink getNoopTopicSink(String topicName) {
+ return NoopTopicSink.factory.get(topicName);
+ }
+
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java
index 946e48c0..53315391 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/NoopTopicSinkFactory.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -33,191 +33,194 @@ import org.slf4j.LoggerFactory;
* Noop Topic Sink Factory
*/
public interface NoopTopicSinkFactory {
-
- /**
- * Creates noop topic sinks based on properties files
- *
- * @param properties Properties containing initialization values
- *
- * @return a noop topic sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public List<NoopTopicSink> build(Properties properties);
-
- /**
- * builds a noop sink
- *
- * @param servers list of servers
- * @param topic topic name
- * @param managed is this sink endpoint managed?
- * @return a noop topic sink
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public NoopTopicSink build(List<String> servers, String topic, boolean managed);
-
- /**
- * Destroys a sink based on the topic
- *
- * @param topic topic name
- * @throws IllegalArgumentException if invalid parameters are present
- */
- public void destroy(String topic);
-
- /**
- * gets a sink based on topic name
- * @param topic the topic name
- *
- * @return a sink with topic name
- * @throws IllegalArgumentException if an invalid topic is provided
- * @throws IllegalStateException if the sink is in an incorrect state
- */
- public NoopTopicSink get(String topic);
-
- /**
- * Provides a snapshot of the UEB Topic Writers
- * @return a list of the UEB Topic Writers
- */
- public List<NoopTopicSink> inventory();
-
- /**
- * Destroys all sinks
- */
- public void destroy();
+
+ /**
+ * Creates noop topic sinks based on properties files
+ *
+ * @param properties Properties containing initialization values
+ *
+ * @return a noop topic sink
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public List<NoopTopicSink> build(Properties properties);
+
+ /**
+ * builds a noop sink
+ *
+ * @param servers list of servers
+ * @param topic topic name
+ * @param managed is this sink endpoint managed?
+ * @return a noop topic sink
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public NoopTopicSink build(List<String> servers, String topic, boolean managed);
+
+ /**
+ * Destroys a sink based on the topic
+ *
+ * @param topic topic name
+ * @throws IllegalArgumentException if invalid parameters are present
+ */
+ public void destroy(String topic);
+
+ /**
+ * gets a sink based on topic name
+ *
+ * @param topic the topic name
+ *
+ * @return a sink with topic name
+ * @throws IllegalArgumentException if an invalid topic is provided
+ * @throws IllegalStateException if the sink is in an incorrect state
+ */
+ public NoopTopicSink get(String topic);
+
+ /**
+ * Provides a snapshot of the UEB Topic Writers
+ *
+ * @return a list of the UEB Topic Writers
+ */
+ public List<NoopTopicSink> inventory();
+
+ /**
+ * Destroys all sinks
+ */
+ public void destroy();
}
+
/* ------------- implementation ----------------- */
/**
* Factory of noop sinks
*/
class IndexedNoopTopicSinkFactory implements NoopTopicSinkFactory {
- /**
- * Logger
- */
- private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
-
- /**
- * noop topic sinks map
- */
- protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>();
-
- @Override
- public List<NoopTopicSink> build(Properties properties) {
-
- String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS);
- if (sinkTopics == null || sinkTopics.isEmpty()) {
- logger.info("{}: no topic for noop sink", this);
- return new ArrayList<>();
- }
-
- List<String> sinkTopicList = new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*")));
- List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>();
- synchronized(this) {
- for (String topic: sinkTopicList) {
- if (this.noopTopicSinks.containsKey(topic)) {
- newSinks.add(this.noopTopicSinks.get(topic));
- continue;
- }
-
- String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." +
- topic +
- PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
-
- if (servers == null || servers.isEmpty())
- servers = "noop";
-
- List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
-
- String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
- PolicyProperties.PROPERTY_MANAGED_SUFFIX);
- boolean managed = true;
- if (managedString != null && !managedString.isEmpty()) {
- managed = Boolean.parseBoolean(managedString);
- }
-
- NoopTopicSink noopSink = this.build(serverList, topic, managed);
- newSinks.add(noopSink);
- }
- return newSinks;
- }
- }
-
- @Override
- public NoopTopicSink build(List<String> servers, String topic, boolean managed) {
- if (servers == null) {
- servers = new ArrayList<>();
- }
-
- if (servers.isEmpty()) {
- servers.add("noop");
- }
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("A topic must be provided");
- }
-
- synchronized (this) {
- if (noopTopicSinks.containsKey(topic)) {
- return noopTopicSinks.get(topic);
- }
-
- NoopTopicSink sink =
- new NoopTopicSink(servers, topic);
-
- if (managed)
- noopTopicSinks.put(topic, sink);
-
- return sink;
- }
- }
-
- @Override
- public void destroy(String topic) {
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("A topic must be provided");
- }
-
- NoopTopicSink noopSink;
- synchronized(this) {
- if (!noopTopicSinks.containsKey(topic)) {
- return;
- }
-
- noopSink = noopTopicSinks.remove(topic);
- }
-
- noopSink.shutdown();
- }
-
- @Override
- public void destroy() {
- List<NoopTopicSink> sinks = this.inventory();
- for (NoopTopicSink sink: sinks) {
- sink.shutdown();
- }
-
- synchronized(this) {
- this.noopTopicSinks.clear();
- }
- }
-
- @Override
- public NoopTopicSink get(String topic) {
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("A topic must be provided");
- }
-
- synchronized(this) {
- if (noopTopicSinks.containsKey(topic)) {
- return noopTopicSinks.get(topic);
- } else {
- throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
- }
- }
- }
-
- @Override
- public List<NoopTopicSink> inventory() {
- return new ArrayList<>(this.noopTopicSinks.values());
- }
+ /**
+ * Logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(IndexedUebTopicSinkFactory.class);
+
+ /**
+ * noop topic sinks map
+ */
+ protected HashMap<String, NoopTopicSink> noopTopicSinks = new HashMap<>();
+
+ @Override
+ public List<NoopTopicSink> build(Properties properties) {
+
+ final String sinkTopics = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS);
+ if (sinkTopics == null || sinkTopics.isEmpty()) {
+ logger.info("{}: no topic for noop sink", this);
+ return new ArrayList<>();
+ }
+
+ final List<String> sinkTopicList =
+ new ArrayList<>(Arrays.asList(sinkTopics.split("\\s*,\\s*")));
+ final List<NoopTopicSink> newSinks = new ArrayList<NoopTopicSink>();
+ synchronized (this) {
+ for (final String topic : sinkTopicList) {
+ if (this.noopTopicSinks.containsKey(topic)) {
+ newSinks.add(this.noopTopicSinks.get(topic));
+ continue;
+ }
+
+ String servers = properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "."
+ + topic + PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+
+ if (servers == null || servers.isEmpty())
+ servers = "noop";
+
+ final List<String> serverList = new ArrayList<>(Arrays.asList(servers.split("\\s*,\\s*")));
+
+ final String managedString =
+ properties.getProperty(PolicyProperties.PROPERTY_NOOP_SINK_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_MANAGED_SUFFIX);
+ boolean managed = true;
+ if (managedString != null && !managedString.isEmpty()) {
+ managed = Boolean.parseBoolean(managedString);
+ }
+
+ final NoopTopicSink noopSink = this.build(serverList, topic, managed);
+ newSinks.add(noopSink);
+ }
+ return newSinks;
+ }
+ }
+
+ @Override
+ public NoopTopicSink build(List<String> servers, String topic, boolean managed) {
+ if (servers == null) {
+ servers = new ArrayList<>();
+ }
+
+ if (servers.isEmpty()) {
+ servers.add("noop");
+ }
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ synchronized (this) {
+ if (this.noopTopicSinks.containsKey(topic)) {
+ return this.noopTopicSinks.get(topic);
+ }
+
+ final NoopTopicSink sink = new NoopTopicSink(servers, topic);
+
+ if (managed)
+ this.noopTopicSinks.put(topic, sink);
+
+ return sink;
+ }
+ }
+
+ @Override
+ public void destroy(String topic) {
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ NoopTopicSink noopSink;
+ synchronized (this) {
+ if (!this.noopTopicSinks.containsKey(topic)) {
+ return;
+ }
+
+ noopSink = this.noopTopicSinks.remove(topic);
+ }
+
+ noopSink.shutdown();
+ }
+
+ @Override
+ public void destroy() {
+ final List<NoopTopicSink> sinks = this.inventory();
+ for (final NoopTopicSink sink : sinks) {
+ sink.shutdown();
+ }
+
+ synchronized (this) {
+ this.noopTopicSinks.clear();
+ }
+ }
+
+ @Override
+ public NoopTopicSink get(String topic) {
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("A topic must be provided");
+ }
+
+ synchronized (this) {
+ if (this.noopTopicSinks.containsKey(topic)) {
+ return this.noopTopicSinks.get(topic);
+ } else {
+ throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
+ }
+ }
+ }
+
+ @Override
+ public List<NoopTopicSink> inventory() {
+ return new ArrayList<>(this.noopTopicSinks.values());
+ }
}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
index edb03bba..8171c35d 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,427 +34,432 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
import com.att.nsa.cambria.client.CambriaConsumer;
import com.att.nsa.mr.client.MRClientFactory;
import com.att.nsa.mr.client.impl.MRConsumerImpl;
import com.att.nsa.mr.client.response.MRConsumerResponse;
import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
-import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
/**
* Wrapper around libraries to consume from message bus
*
*/
public interface BusConsumer {
-
- /**
- * fetch messages
- *
- * @return list of messages
- * @throws Exception when error encountered by underlying libraries
- */
- public Iterable<String> fetch() throws InterruptedException, IOException;
-
- /**
- * close underlying library consumer
- */
- public void close();
-
- /**
- * Cambria based consumer
- */
- public static class CambriaConsumerWrapper implements BusConsumer {
-
- /**
- * Cambria client
- */
- protected CambriaConsumer consumer;
-
- /**
- * Cambria Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @throws GeneralSecurityException
- * @throws MalformedURLException
- */
- public CambriaConsumerWrapper(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit, boolean useHttps, boolean useSelfSignedCerts)
- throws IllegalArgumentException {
-
- ConsumerBuilder builder =
- new CambriaClientBuilders.ConsumerBuilder();
-
-
- if (useHttps){
-
- if(useSelfSignedCerts){
- builder.knownAs(consumerGroup, consumerInstance)
- .usingHosts(servers)
- .onTopic(topic)
- .waitAtServer(fetchTimeout)
- .receivingAtMost(fetchLimit)
- .usingHttps()
- .allowSelfSignedCertificates();
- }
- else{
- builder.knownAs(consumerGroup, consumerInstance)
- .usingHosts(servers)
- .onTopic(topic)
- .waitAtServer(fetchTimeout)
- .receivingAtMost(fetchLimit)
- .usingHttps();
- }
- }
- else{
- builder.knownAs(consumerGroup, consumerInstance)
- .usingHosts(servers)
- .onTopic(topic)
- .waitAtServer(fetchTimeout)
- .receivingAtMost(fetchLimit);
- }
-
- if (apiKey != null && !apiKey.isEmpty() &&
- apiSecret != null && !apiSecret.isEmpty()) {
- builder.authenticatedBy(apiKey, apiSecret);
- }
-
- try {
- this.consumer = builder.build();
- } catch (MalformedURLException | GeneralSecurityException e) {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
- public Iterable<String> fetch() throws IOException {
- return this.consumer.fetch();
- }
-
- @Override
- public void close() {
- this.consumer.close();
- }
-
- @Override
- public String toString() {
- return "CambriaConsumerWrapper []";
- }
- }
-
- /**
- * MR based consumer
- */
- public abstract class DmaapConsumerWrapper implements BusConsumer {
-
- /**
- * logger
- */
- private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
-
- /**
- * fetch timeout
- */
- protected int fetchTimeout;
-
- /**
- * close condition
- */
- protected Object closeCondition = new Object();
-
- /**
- * MR Consumer
- */
- protected MRConsumerImpl consumer;
-
- /**
- * MR Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param username AAF Login
- * @param password AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @throws MalformedURLException
- */
- public DmaapConsumerWrapper(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String username, String password,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit, boolean useHttps)
- throws MalformedURLException {
-
- this.fetchTimeout = fetchTimeout;
-
- if (topic == null || topic.isEmpty()) {
- throw new IllegalArgumentException("No topic for DMaaP");
- }
-
- this.consumer = new MRConsumerImpl(servers, topic,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit,
- null, apiKey, apiSecret);
-
- this.consumer.setUsername(username);
- this.consumer.setPassword(password);
- }
-
- @Override
- public Iterable<String> fetch() throws InterruptedException, IOException {
- MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
- if (response == null) {
- logger.warn("{}: DMaaP NULL response received", this);
-
- synchronized (closeCondition) {
- closeCondition.wait(fetchTimeout);
- }
- return new ArrayList<>();
- } else {
- logger.debug("DMaaP consumer received {} : {}" +
- response.getResponseCode(),
- response.getResponseMessage());
-
- if (response.getResponseCode() == null ||
- !response.getResponseCode().equals("200")) {
-
- logger.error("DMaaP consumer received: {} : {}",
- response.getResponseCode(),
- response.getResponseMessage());
-
- synchronized (closeCondition) {
- closeCondition.wait(fetchTimeout);
- }
-
- /* fall through */
- }
- }
-
- if (response.getActualMessages() == null)
- return new ArrayList<>();
- else
- return response.getActualMessages();
- }
-
- @Override
- public void close() {
- synchronized (closeCondition) {
- closeCondition.notifyAll();
- }
-
- this.consumer.close();
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.
- append("DmaapConsumerWrapper [").
- append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
- append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
- append(", consumer.getHost()=").append(consumer.getHost()).
- append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
- append(", consumer.getUsername()=").append(consumer.getUsername()).
- append("]");
- return builder.toString();
- }
- }
-
- /**
- * MR based consumer
- */
- public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
-
- private Properties props;
-
- /**
- * MR Consumer Wrapper
- *
- * @param servers messaging bus hosts
- * @param topic topic
- * @param apiKey API Key
- * @param apiSecret API Secret
- * @param aafLogin AAF Login
- * @param aafPassword AAF Password
- * @param consumerGroup Consumer Group
- * @param consumerInstance Consumer Instance
- * @param fetchTimeout Fetch Timeout
- * @param fetchLimit Fetch Limit
- * @throws MalformedURLException
- */
- public DmaapAafConsumerWrapper(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String aafLogin, String aafPassword,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException {
-
- super(servers, topic, apiKey, apiSecret,
- aafLogin, aafPassword,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, useHttps);
-
- // super constructor sets servers = {""} if empty to avoid errors when using DME2
- if ((servers.size() == 1 && servers.get(0).equals("")) ||
- (servers == null) || (servers.isEmpty())) {
- throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
- }
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
-
- props = new Properties();
-
- if(useHttps){
- props.setProperty("Protocol", "https");
- this.consumer.setHost(servers.get(0) + ":3905");
-
- }
- else{
- props.setProperty("Protocol", "http");
- this.consumer.setHost(servers.get(0) + ":3904");
- }
-
- this.consumer.setProps(props);
- logger.info("{}: CREATION", this);
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- MRConsumerImpl consumer = (MRConsumerImpl) this.consumer;
-
- builder.
- append("DmaapConsumerWrapper [").
- append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
- append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
- append(", consumer.getHost()=").append(consumer.getHost()).
- append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
- append(", consumer.getUsername()=").append(consumer.getUsername()).
- append("]");
- return builder.toString();
- }
- }
-
- public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
-
- private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
-
- private Properties props;
-
- public DmaapDmeConsumerWrapper(List<String> servers, String topic,
- String apiKey, String apiSecret,
- String dme2Login, String dme2Password,
- String consumerGroup, String consumerInstance,
- int fetchTimeout, int fetchLimit,
- String environment, String aftEnvironment, String dme2Partner,
- String latitude, String longitude, Map<String,String> additionalProps, boolean useHttps) throws MalformedURLException {
-
-
-
- super(servers, topic, apiKey, apiSecret,
- dme2Login, dme2Password,
- consumerGroup, consumerInstance,
- fetchTimeout, fetchLimit, useHttps);
-
-
- String dme2RouteOffer = additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
-
- if (environment == null || environment.isEmpty()) {
- throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
- } if (aftEnvironment == null || aftEnvironment.isEmpty()) {
- throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX + " property for DME2 in DMaaP");
- } if (latitude == null || latitude.isEmpty()) {
- throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
- } if (longitude == null || longitude.isEmpty()) {
- throw new IllegalArgumentException("Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX + " property for DME2 in DMaaP");
- }
-
- if ((dme2Partner == null || dme2Partner.isEmpty()) && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
- throw new IllegalArgumentException("Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS +
- "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or " +
- PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
- }
-
- String serviceName = servers.get(0);
-
- this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
-
- this.consumer.setUsername(dme2Login);
- this.consumer.setPassword(dme2Password);
-
- props = new Properties();
-
- props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
-
- props.setProperty("username", dme2Login);
- props.setProperty("password", dme2Password);
-
- /* These are required, no defaults */
- props.setProperty("topic", topic);
-
- props.setProperty("Environment", environment);
- props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
-
- if (dme2Partner != null)
- props.setProperty("Partner", dme2Partner);
- if (dme2RouteOffer != null)
- props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
-
- props.setProperty("Latitude", latitude);
- props.setProperty("Longitude", longitude);
-
- /* These are optional, will default to these values if not set in additionalProps */
- props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
- props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
- props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
- props.setProperty("Version", "1.0");
- props.setProperty("SubContextPath", "/");
- props.setProperty("sessionstickinessrequired", "no");
-
- /* These should not change */
- props.setProperty("TransportType", "DME2");
- props.setProperty("MethodType", "GET");
-
- if(useHttps){
- props.setProperty("Protocol", "https");
-
- }
- else{
- props.setProperty("Protocol", "http");
- }
-
- props.setProperty("contenttype", "application/json");
-
- if (additionalProps != null) {
- for(String key : additionalProps.keySet())
- props.put(key, additionalProps.get(key));
- }
-
- MRClientFactory.prop = props;
- this.consumer.setProps(props);
-
- logger.info("{}: CREATION", this);
- }
- }
-}
+ /**
+ * fetch messages
+ *
+ * @return list of messages
+ * @throws Exception when error encountered by underlying libraries
+ */
+ public Iterable<String> fetch() throws InterruptedException, IOException;
+
+ /**
+ * close underlying library consumer
+ */
+ public void close();
+
+ /**
+ * Cambria based consumer
+ */
+ public static class CambriaConsumerWrapper implements BusConsumer {
+
+ /**
+ * logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
+
+ /**
+ * Cambria client
+ */
+ protected CambriaConsumer consumer;
+
+ /**
+ * fetch timeout
+ */
+ protected int fetchTimeout;
+
+ /**
+ * close condition
+ */
+ protected Object closeCondition = new Object();
+
+ /**
+ * Cambria Consumer Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @throws GeneralSecurityException
+ * @throws MalformedURLException
+ */
+ public CambriaConsumerWrapper(List<String> servers, String topic, String apiKey,
+ String apiSecret, String consumerGroup, String consumerInstance, int fetchTimeout,
+ int fetchLimit, boolean useHttps, boolean useSelfSignedCerts)
+ throws IllegalArgumentException {
+
+ this.fetchTimeout = fetchTimeout;
+
+ final ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
+
+ if (useHttps) {
+
+ if (useSelfSignedCerts) {
+ builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
+ .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps()
+ .allowSelfSignedCertificates();
+ } else {
+ builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
+ .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit).usingHttps();
+ }
+ } else {
+ builder.knownAs(consumerGroup, consumerInstance).usingHosts(servers).onTopic(topic)
+ .waitAtServer(fetchTimeout).receivingAtMost(fetchLimit);
+ }
+
+ if (apiKey != null && !apiKey.isEmpty() && apiSecret != null && !apiSecret.isEmpty()) {
+ builder.authenticatedBy(apiKey, apiSecret);
+ }
+
+ try {
+ this.consumer = builder.build();
+ } catch (MalformedURLException | GeneralSecurityException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public Iterable<String> fetch() throws IOException, InterruptedException {
+ try {
+ return this.consumer.fetch();
+ } catch (final IOException e) {
+ logger.error("{}: cannot fetch because of {} - backoff for {} ms.", this, e.getMessage(),
+ this.fetchTimeout);
+ synchronized (this.closeCondition) {
+ this.closeCondition.wait(this.fetchTimeout);
+ }
+
+ throw e;
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized (closeCondition) {
+ closeCondition.notifyAll();
+ }
+
+ this.consumer.close();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("CambriaConsumerWrapper [fetchTimeout=").append(fetchTimeout).append("]");
+ return builder.toString();
+ }
+ }
+
+ /**
+ * MR based consumer
+ */
+ public abstract class DmaapConsumerWrapper implements BusConsumer {
+
+ /**
+ * logger
+ */
+ private static Logger logger = LoggerFactory.getLogger(DmaapConsumerWrapper.class);
+
+ /**
+ * fetch timeout
+ */
+ protected int fetchTimeout;
+
+ /**
+ * close condition
+ */
+ protected Object closeCondition = new Object();
+
+ /**
+ * MR Consumer
+ */
+ protected MRConsumerImpl consumer;
+
+ /**
+ * MR Consumer Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param username AAF Login
+ * @param password AAF Password
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @throws MalformedURLException
+ */
+ public DmaapConsumerWrapper(List<String> servers, String topic, String apiKey, String apiSecret,
+ String username, String password, String consumerGroup, String consumerInstance,
+ int fetchTimeout, int fetchLimit, boolean useHttps) throws MalformedURLException {
+
+ this.fetchTimeout = fetchTimeout;
+
+ if (topic == null || topic.isEmpty()) {
+ throw new IllegalArgumentException("No topic for DMaaP");
+ }
+
+ this.consumer = new MRConsumerImpl(servers, topic, consumerGroup, consumerInstance,
+ fetchTimeout, fetchLimit, null, apiKey, apiSecret);
+
+ this.consumer.setUsername(username);
+ this.consumer.setPassword(password);
+ }
+
+ @Override
+ public Iterable<String> fetch() throws InterruptedException, IOException {
+ final MRConsumerResponse response = this.consumer.fetchWithReturnConsumerResponse();
+ if (response == null) {
+ logger.warn("{}: DMaaP NULL response received", this);
+
+ synchronized (closeCondition) {
+ closeCondition.wait(fetchTimeout);
+ }
+ return new ArrayList<>();
+ } else {
+ logger.debug("DMaaP consumer received {} : {}" + response.getResponseCode(),
+ response.getResponseMessage());
+
+ if (response.getResponseCode() == null || !response.getResponseCode().equals("200")) {
+
+ logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
+ response.getResponseMessage());
+
+ synchronized (closeCondition) {
+ closeCondition.wait(fetchTimeout);
+ }
+
+ /* fall through */
+ }
+ }
+
+ if (response.getActualMessages() == null)
+ return new ArrayList<>();
+ else
+ return response.getActualMessages();
+ }
+
+ @Override
+ public void close() {
+ synchronized (closeCondition) {
+ closeCondition.notifyAll();
+ }
+
+ this.consumer.close();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=")
+ .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=")
+ .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost())
+ .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag())
+ .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]");
+ return builder.toString();
+ }
+ }
+ /**
+ * MR based consumer
+ */
+ public static class DmaapAafConsumerWrapper extends DmaapConsumerWrapper {
+
+ private static Logger logger = LoggerFactory.getLogger(DmaapAafConsumerWrapper.class);
+
+ private final Properties props;
+
+ /**
+ * MR Consumer Wrapper
+ *
+ * @param servers messaging bus hosts
+ * @param topic topic
+ * @param apiKey API Key
+ * @param apiSecret API Secret
+ * @param aafLogin AAF Login
+ * @param aafPassword AAF Password
+ * @param consumerGroup Consumer Group
+ * @param consumerInstance Consumer Instance
+ * @param fetchTimeout Fetch Timeout
+ * @param fetchLimit Fetch Limit
+ * @throws MalformedURLException
+ */
+ public DmaapAafConsumerWrapper(List<String> servers, String topic, String apiKey,
+ String apiSecret, String aafLogin, String aafPassword, String consumerGroup,
+ String consumerInstance, int fetchTimeout, int fetchLimit, boolean useHttps)
+ throws MalformedURLException {
+
+ super(servers, topic, apiKey, apiSecret, aafLogin, aafPassword, consumerGroup,
+ consumerInstance, fetchTimeout, fetchLimit, useHttps);
+
+ // super constructor sets servers = {""} if empty to avoid errors when using DME2
+ if ((servers.size() == 1 && ("".equals(servers.get(0)))) || (servers == null)
+ || (servers.isEmpty())) {
+ throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
+ }
+
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+
+ props = new Properties();
+
+ if (useHttps) {
+ props.setProperty("Protocol", "https");
+ this.consumer.setHost(servers.get(0) + ":3905");
+
+ } else {
+ props.setProperty("Protocol", "http");
+ this.consumer.setHost(servers.get(0) + ":3904");
+ }
+
+ this.consumer.setProps(props);
+ logger.info("{}: CREATION", this);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder builder = new StringBuilder();
+ final MRConsumerImpl consumer = this.consumer;
+
+ builder.append("DmaapConsumerWrapper [").append("consumer.getAuthDate()=")
+ .append(consumer.getAuthDate()).append(", consumer.getAuthKey()=")
+ .append(consumer.getAuthKey()).append(", consumer.getHost()=").append(consumer.getHost())
+ .append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag())
+ .append(", consumer.getUsername()=").append(consumer.getUsername()).append("]");
+ return builder.toString();
+ }
+ }
+
+ public static class DmaapDmeConsumerWrapper extends DmaapConsumerWrapper {
+
+ private static Logger logger = LoggerFactory.getLogger(DmaapDmeConsumerWrapper.class);
+
+ private final Properties props;
+
+ public DmaapDmeConsumerWrapper(List<String> servers, String topic, String apiKey,
+ String apiSecret, String dme2Login, String dme2Password, String consumerGroup,
+ String consumerInstance, int fetchTimeout, int fetchLimit, String environment,
+ String aftEnvironment, String dme2Partner, String latitude, String longitude,
+ Map<String, String> additionalProps, boolean useHttps) throws MalformedURLException {
+
+
+
+ super(servers, topic, apiKey, apiSecret, dme2Login, dme2Password, consumerGroup,
+ consumerInstance, fetchTimeout, fetchLimit, useHttps);
+
+
+ final String dme2RouteOffer =
+ additionalProps.get(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY);
+
+ if (environment == null || environment.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_ENVIRONMENT_SUFFIX
+ + " property for DME2 in DMaaP");
+ }
+ if (aftEnvironment == null || aftEnvironment.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_AFT_ENVIRONMENT_SUFFIX
+ + " property for DME2 in DMaaP");
+ }
+ if (latitude == null || latitude.isEmpty()) {
+ throw new IllegalArgumentException("Missing "
+ + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_LATITUDE_SUFFIX + " property for DME2 in DMaaP");
+ }
+ if (longitude == null || longitude.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Missing " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_LONGITUDE_SUFFIX
+ + " property for DME2 in DMaaP");
+ }
+
+ if ((dme2Partner == null || dme2Partner.isEmpty())
+ && (dme2RouteOffer == null || dme2RouteOffer.isEmpty())) {
+ throw new IllegalArgumentException(
+ "Must provide at least " + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_PARTNER_SUFFIX + " or "
+ + PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + topic
+ + PolicyProperties.PROPERTY_DMAAP_DME2_ROUTE_OFFER_SUFFIX + " for DME2");
+ }
+
+ final String serviceName = servers.get(0);
+
+ this.consumer.setProtocolFlag(ProtocolTypeConstants.DME2.getValue());
+
+ this.consumer.setUsername(dme2Login);
+ this.consumer.setPassword(dme2Password);
+
+ props = new Properties();
+
+ props.setProperty(DmaapTopicSinkFactory.DME2_SERVICE_NAME_PROPERTY, serviceName);
+
+ props.setProperty("username", dme2Login);
+ props.setProperty("password", dme2Password);
+
+ /* These are required, no defaults */
+ props.setProperty("topic", topic);
+
+ props.setProperty("Environment", environment);
+ props.setProperty("AFT_ENVIRONMENT", aftEnvironment);
+
+ if (dme2Partner != null)
+ props.setProperty("Partner", dme2Partner);
+ if (dme2RouteOffer != null)
+ props.setProperty(DmaapTopicSinkFactory.DME2_ROUTE_OFFER_PROPERTY, dme2RouteOffer);
+
+ props.setProperty("Latitude", latitude);
+ props.setProperty("Longitude", longitude);
+
+ /* These are optional, will default to these values if not set in additionalProps */
+ props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", "50000");
+ props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", "240000");
+ props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", "15000");
+ props.setProperty("Version", "1.0");
+ props.setProperty("SubContextPath", "/");
+ props.setProperty("sessionstickinessrequired", "no");
+
+ /* These should not change */
+ props.setProperty("TransportType", "DME2");
+ props.setProperty("MethodType", "GET");
+
+ if (useHttps) {
+ props.setProperty("Protocol", "https");
+
+ } else {
+ props.setProperty("Protocol", "http");
+ }
+
+ props.setProperty("contenttype", "application/json");
+
+ if (additionalProps != null) {
+ for (final String key : additionalProps.keySet())
+ props.put(key, additionalProps.get(key));
+ }
+
+ MRClientFactory.prop = props;
+ this.consumer.setProps(props);
+
+ logger.info("{}: CREATION", this);
+ }
+ }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java
index f7ef7bcf..dd9a7c2b 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/drools/http/server/test/HttpClientTest.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,6 +22,7 @@ package org.onap.policy.drools.http.server.test;
import static org.junit.Assert.assertTrue;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
@@ -33,186 +34,170 @@ import org.junit.Test;
import org.onap.policy.drools.http.client.HttpClient;
import org.onap.policy.drools.http.server.HttpServletServer;
import org.onap.policy.drools.properties.PolicyProperties;
+import org.onap.policy.drools.utils.NetworkUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpClientTest {
-
- private static Logger logger = LoggerFactory.getLogger(HttpClientTest.class);
-
- @BeforeClass
- public static void setUp() throws InterruptedException {
- logger.info("-- setup() --");
-
- /* echo server */
-
- HttpServletServer echoServerNoAuth = HttpServletServer.factory.build("echo", "localhost", 6666, "/", false, true);
- echoServerNoAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName());
- echoServerNoAuth.waitedStart(5000);
-
- /* no auth echo server */
-
- HttpServletServer echoServerAuth = HttpServletServer.factory.build("echo", "localhost", 6667, "/", false, true);
- echoServerAuth.setBasicAuthentication("x", "y", null);
- echoServerAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName());
- echoServerAuth.waitedStart(5000);
- }
-
- @AfterClass
- public static void tearDown() {
- logger.info("-- tearDown() --");
-
- HttpServletServer.factory.destroy();
- HttpClient.factory.destroy();
- }
-
- @Test
- public void testHttpNoAuthClient() throws Exception {
- logger.info("-- testHttpNoAuthClient() --");
-
- HttpClient client = HttpClient.factory.build("testHttpNoAuthClient", false, false,
- "localhost", 6666, "junit/echo",
- null, null, true);
- Response response = client.get("hello");
- String body = HttpClient.getBody(response, String.class);
-
- assertTrue(response.getStatus() == 200);
- assertTrue(body.equals("hello"));
- }
-
- @Test
- public void testHttpAuthClient() throws Exception {
- logger.info("-- testHttpAuthClient() --");
-
- HttpClient client = HttpClient.factory.build("testHttpAuthClient",false, false,
- "localhost", 6667, "junit/echo",
- "x", "y", true);
- Response response = client.get("hello");
- String body = HttpClient.getBody(response, String.class);
-
- assertTrue(response.getStatus() == 200);
- assertTrue(body.equals("hello"));
- }
-
- @Test
- public void testHttpAuthClient401() throws Exception {
- logger.info("-- testHttpAuthClient401() --");
-
- HttpClient client = HttpClient.factory.build("testHttpAuthClient401",false, false,
- "localhost", 6667, "junit/echo",
- null, null, true);
- Response response = client.get("hello");
- assertTrue(response.getStatus() == 401);
- }
-
- @Test
- public void testHttpAuthClientProps() throws Exception {
- logger.info("-- testHttpAuthClientProps() --");
-
- Properties httpProperties = new Properties();
-
- httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES, "PAP,PDP");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX,
- "localhost");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX,
- "7777");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX,
- "testpap");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX,
- "alpha123");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
- RestMockHealthCheck.class.getName());
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX,
- "true");
-
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX,
- "localhost");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX,
- "7778");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX,
- "testpdp");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX,
- "alpha123");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
- RestMockHealthCheck.class.getName());
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX,
- "true");
-
- httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES, "PAP,PDP");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX,
- "localhost");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX,
- "7777");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX,
- "pap/test");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX,
- "false");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX,
- "testpap");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX,
- "alpha123");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX,
- "true");
-
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX,
- "localhost");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX,
- "7778");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX,
- "pdp");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX,
- "false");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX,
- "testpdp");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX,
- "alpha123");
- httpProperties.setProperty
- (PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX,
- "true");
-
- ArrayList<HttpServletServer> servers = HttpServletServer.factory.build(httpProperties);
- assertTrue(servers.size() == 2);
-
- ArrayList<HttpClient> clients = HttpClient.factory.build(httpProperties);
- assertTrue(clients.size() == 2);
-
- for (HttpServletServer server: servers) {
- server.waitedStart(5000);
- }
-
- HttpClient clientPAP = HttpClient.factory.get("PAP");
- Response response = clientPAP.get();
- assertTrue(response.getStatus() == 200);
-
- HttpClient clientPDP = HttpClient.factory.get("PDP");
- Response response2 = clientPDP.get("test");
- assertTrue(response2.getStatus() == 500);
+
+ private static Logger logger = LoggerFactory.getLogger(HttpClientTest.class);
+
+ @BeforeClass
+ public static void setUp() throws InterruptedException, IOException {
+ logger.info("-- setup() --");
+
+ /* echo server */
+
+ final HttpServletServer echoServerNoAuth =
+ HttpServletServer.factory.build("echo", "localhost", 6666, "/", false, true);
+ echoServerNoAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName());
+ echoServerNoAuth.waitedStart(5000);
+
+ if (!NetworkUtil.isTcpPortOpen("localhost", echoServerNoAuth.getPort(), 5, 10000L))
+ throw new IllegalStateException("cannot connect to port " + echoServerNoAuth.getPort());
+
+ /* no auth echo server */
+
+ final HttpServletServer echoServerAuth =
+ HttpServletServer.factory.build("echo", "localhost", 6667, "/", false, true);
+ echoServerAuth.setBasicAuthentication("x", "y", null);
+ echoServerAuth.addServletPackage("/*", HttpClientTest.class.getPackage().getName());
+ echoServerAuth.waitedStart(5000);
+
+ if (!NetworkUtil.isTcpPortOpen("localhost", echoServerAuth.getPort(), 5, 10000L))
+ throw new IllegalStateException("cannot connect to port " + echoServerAuth.getPort());
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ logger.info("-- tearDown() --");
+
+ HttpServletServer.factory.destroy();
+ HttpClient.factory.destroy();
+ }
+
+ @Test
+ public void testHttpNoAuthClient() throws Exception {
+ logger.info("-- testHttpNoAuthClient() --");
+
+ final HttpClient client = HttpClient.factory.build("testHttpNoAuthClient", false, false,
+ "localhost", 6666, "junit/echo", null, null, true);
+ final Response response = client.get("hello");
+ final String body = HttpClient.getBody(response, String.class);
+
+ assertTrue(response.getStatus() == 200);
+ assertTrue(body.equals("hello"));
+ }
+
+ @Test
+ public void testHttpAuthClient() throws Exception {
+ logger.info("-- testHttpAuthClient() --");
+
+ final HttpClient client = HttpClient.factory.build("testHttpAuthClient", false, false,
+ "localhost", 6667, "junit/echo", "x", "y", true);
+ final Response response = client.get("hello");
+ final String body = HttpClient.getBody(response, String.class);
+
+ assertTrue(response.getStatus() == 200);
+ assertTrue(body.equals("hello"));
+ }
+
+ @Test
+ public void testHttpAuthClient401() throws Exception {
+ logger.info("-- testHttpAuthClient401() --");
+
+ final HttpClient client = HttpClient.factory.build("testHttpAuthClient401", false, false,
+ "localhost", 6667, "junit/echo", null, null, true);
+ final Response response = client.get("hello");
+ assertTrue(response.getStatus() == 401);
+ }
+
+ @Test
+ public void testHttpAuthClientProps() throws Exception {
+ logger.info("-- testHttpAuthClientProps() --");
+
+ final Properties httpProperties = new Properties();
+
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES, "PAP,PDP");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7777");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpap");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123");
+ httpProperties.setProperty(
+ PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
+ RestMockHealthCheck.class.getName());
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true");
+
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7778");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpdp");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123");
+ httpProperties.setProperty(
+ PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX,
+ RestMockHealthCheck.class.getName());
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true");
+
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES, "PAP,PDP");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7777");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, "pap/test");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, "false");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpap");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP"
+ + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true");
+
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, "localhost");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, "7778");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, "pdp");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, "false");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, "testpdp");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, "alpha123");
+ httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP"
+ + PolicyProperties.PROPERTY_MANAGED_SUFFIX, "true");
+
+ final ArrayList<HttpServletServer> servers = HttpServletServer.factory.build(httpProperties);
+ assertTrue(servers.size() == 2);
+
+ final ArrayList<HttpClient> clients = HttpClient.factory.build(httpProperties);
+ assertTrue(clients.size() == 2);
+
+ for (final HttpServletServer server : servers) {
+ server.waitedStart(10000);
}
+ final HttpClient clientPAP = HttpClient.factory.get("PAP");
+ final Response response = clientPAP.get();
+ assertTrue(response.getStatus() == 200);
+
+ final HttpClient clientPDP = HttpClient.factory.get("PDP");
+ final Response response2 = clientPDP.get("test");
+ assertTrue(response2.getStatus() == 500);
+ }
+
}
diff --git a/policy-management/src/main/java/org/onap/policy/drools/persistence/FileSystemPersistence.java b/policy-management/src/main/java/org/onap/policy/drools/persistence/FileSystemPersistence.java
index 5ac17253..e217ee7d 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/persistence/FileSystemPersistence.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/persistence/FileSystemPersistence.java
@@ -164,7 +164,7 @@ public class FileSystemPersistence implements SystemPersistence {
this.backupController(controllerName);
}
} catch (final Exception e) {
- logger.info("{}: no existing {} properties", this, controllerName);
+ logger.info("{}: no existing {} properties {}", this, controllerName, e);
// continue
}
}
diff --git a/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java b/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java
index ffcb35cd..48eedfa5 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/server/restful/RestManager.java
@@ -49,6 +49,7 @@ import org.onap.policy.drools.event.comm.TopicSink;
import org.onap.policy.drools.event.comm.TopicSource;
import org.onap.policy.drools.event.comm.bus.DmaapTopicSink;
import org.onap.policy.drools.event.comm.bus.DmaapTopicSource;
+import org.onap.policy.drools.event.comm.bus.NoopTopicSink;
import org.onap.policy.drools.event.comm.bus.UebTopicSink;
import org.onap.policy.drools.event.comm.bus.UebTopicSource;
import org.onap.policy.drools.features.PolicyControllerFeatureAPI;
@@ -1645,7 +1646,7 @@ public class RestManager {
@Path("engine/topics/sinks/ueb")
@ApiOperation(value = "Retrieves the UEB managed topic sinks",
notes = "UEB Topic Sinks Agregation", responseContainer = "List",
- response = UebTopicSource.class)
+ response = UebTopicSink.class)
public Response uebSinks() {
return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getUebTopicSinks())
.build();
@@ -1763,6 +1764,36 @@ public class RestManager {
}
@GET
+ @Path("engine/topics/sinks/noop")
+ @ApiOperation(value = "Retrieves the NOOP managed topic sinks",
+ notes = "NOOP Topic Sinks Agregation", responseContainer = "List",
+ response = NoopTopicSink.class)
+ public Response noopSinks() {
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getNoopTopicSinks())
+ .build();
+ }
+
+ @GET
+ @Path("engine/topics/sinks/noop/{topic}")
+ @ApiOperation(value = "Retrieves a NOOP managed topic sink",
+ notes = "NOOP is an dev/null Network Communicaton Sink", response = NoopTopicSink.class)
+ public Response noopSinkTopic(
+ @ApiParam(value = "Topic Name", required = true) @PathParam("topic") String topic) {
+ return Response.status(Response.Status.OK).entity(TopicEndpoint.manager.getNoopTopicSink(topic))
+ .build();
+ }
+
+ @GET
+ @Path("engine/topics/sinks/noop/{topic}/events")
+ @ApiOperation(value = "Retrieves the latest events send through a NOOP topic",
+ notes = "NOOP is an dev/null Network Communicaton Sink", responseContainer = "List")
+ public Response noopSinkEvents(@PathParam("topic") String topic) {
+ return Response.status(Status.OK)
+ .entity(Arrays.asList(TopicEndpoint.manager.getNoopTopicSink(topic).getRecentEvents()))
+ .build();
+ }
+
+ @GET
@Path("engine/topics/sources/ueb/{topic}/switches")
@ApiOperation(value = "UEB Topic Control Switches",
notes = "List of the UEB Topic Control Switches", responseContainer = "List")
diff --git a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java
index a262352b..b2b2df6d 100644
--- a/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java
+++ b/policy-management/src/main/java/org/onap/policy/drools/system/PolicyEngine.java
@@ -760,7 +760,7 @@ class PolicyEngineManager implements PolicyEngine {
for (final HttpServletServer httpServer : this.httpServers) {
try {
- if (!httpServer.waitedStart(5 * 1000L))
+ if (!httpServer.waitedStart(10 * 1000L))
success = false;
} catch (final Exception e) {
logger.error("{}: cannot start http-server {} because of {}", this, httpServer,
@@ -998,6 +998,7 @@ class PolicyEngineManager implements PolicyEngine {
Thread.sleep(5000L);
} catch (final InterruptedException e) {
logger.warn("{}: interrupted-exception while shutting down management server: ", this);
+ Thread.currentThread().interrupt();
}
System.exit(0);
diff --git a/policy-utils/src/main/java/org/onap/policy/drools/utils/NetworkUtil.java b/policy-utils/src/main/java/org/onap/policy/drools/utils/NetworkUtil.java
index f6c837f5..bd5b8aac 100644
--- a/policy-utils/src/main/java/org/onap/policy/drools/utils/NetworkUtil.java
+++ b/policy-utils/src/main/java/org/onap/policy/drools/utils/NetworkUtil.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,16 +20,51 @@
package org.onap.policy.drools.utils;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.Socket;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Network Utilities
*/
public class NetworkUtil {
-
- /**
- * IPv4 Wildcard IP address
- */
- public static final String IPv4_WILDCARD_ADDRESS = "0.0.0.0";
-
- /* Other methods will be added as needed */
+
+ public static final Logger logger = LoggerFactory.getLogger(NetworkUtil.class.getName());
+
+ /**
+ * IPv4 Wildcard IP address
+ */
+ public static final String IPv4_WILDCARD_ADDRESS = "0.0.0.0";
+
+
+ /**
+ * try to connect to $host:$port $retries times while we are getting connection failures.
+ *
+ * @param host host
+ * @param port port
+ * @param retries number of attempts
+ * @return true is port is open, false otherwise
+ * @throws InterruptedException if execution has been interrupted
+ */
+ public static boolean isTcpPortOpen(String host, int port, int retries, long interval)
+ throws InterruptedException, IOException {
+ int retry = 0;
+ while (retry < retries) {
+ try (Socket s = new Socket(host, port)) {
+ logger.debug("{}:{} connected - retries={} interval={}", host, port, retries, interval);
+ return true;
+ } catch (final ConnectException e) {
+ retry++;
+ logger.trace("{}:{} connected - retries={} interval={}", host, port, retries, interval, e);
+ Thread.sleep(interval);
+ }
+ }
+
+ logger.warn("{}:{} closed = retries={} interval={}", host, port, retries, interval);
+ return false;
+ }
}
diff --git a/policy-utils/src/test/java/org/onap/policy/drools/utils/PairTripleTest.java b/policy-utils/src/test/java/org/onap/policy/drools/utils/PairTripleTest.java
new file mode 100644
index 00000000..1af831ad
--- /dev/null
+++ b/policy-utils/src/test/java/org/onap/policy/drools/utils/PairTripleTest.java
@@ -0,0 +1,65 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-utils
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.utils;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+public class PairTripleTest {
+
+ @Test
+ public void pairTest() {
+ Pair<String, String> p = new Pair<String, String>("foo", "bar");
+
+ assertEquals(p.first(),"foo");
+ assertEquals(p.second(),"bar");
+ assertEquals(p.getFirst(),"foo");
+ assertEquals(p.getSecond(),"bar");
+
+ p.first("one");
+ p.second("two");
+
+ assertEquals(p.first(),"one");
+ assertEquals(p.second(),"two");
+ assertEquals(p.getFirst(),"one");
+ assertEquals(p.getSecond(),"two");
+
+ }
+
+ @Test
+ public void tripleTest() {
+ Triple<String, String, String> t = new Triple<String, String,String>("foo", "bar", "fiz");
+
+ assertEquals(t.first(),"foo");
+ assertEquals(t.second(),"bar");
+ assertEquals(t.third(),"fiz");
+
+ t.first("one");
+ t.second("two");
+ t.third("three");
+
+ assertEquals(t.first(),"one");
+ assertEquals(t.second(),"two");
+ assertEquals(t.third(),"three");
+ }
+
+}
diff --git a/policy-utils/src/test/java/org/onap/policy/drools/utils/ReflectionUtilTest.java b/policy-utils/src/test/java/org/onap/policy/drools/utils/ReflectionUtilTest.java
new file mode 100644
index 00000000..ab4bace5
--- /dev/null
+++ b/policy-utils/src/test/java/org/onap/policy/drools/utils/ReflectionUtilTest.java
@@ -0,0 +1,62 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-utils
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.drools.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+public class ReflectionUtilTest {
+
+ public class ParentClass {
+
+ }
+
+ public class ChildClass extends ParentClass{
+
+ }
+
+ @Test
+ public void reflectionTest() {
+
+ try {
+
+ Class<?> class1 = Class.forName("org.onap.policy.drools.utils.ReflectionUtil");
+
+ ClassLoader classLoader = class1.getClassLoader();
+
+ Class<?> class2 = ReflectionUtil.fetchClass(classLoader, "org.onap.policy.drools.utils.ReflectionUtil");
+
+
+ assertTrue(ReflectionUtil.isClass(classLoader, "org.onap.policy.drools.utils.ReflectionUtil"));
+ assertEquals(class1,class2);
+ assertTrue(ReflectionUtil.isSubclass(ParentClass.class, ChildClass.class));
+ assertFalse(ReflectionUtil.isSubclass(ChildClass.class, ParentClass.class));
+
+
+ } catch (ClassNotFoundException e) {
+ fail();
+ }
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 3765f933..35aca8e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,9 +43,9 @@
<common-modules.version>1.1.0-SNAPSHOT</common-modules.version>
<dmaap.version>0.2.12</dmaap.version>
<cambria.version>0.0.1</cambria.version>
- <jersey.version>2.22.2</jersey.version>
- <jersey.swagger.version>1.5.13</jersey.swagger.version>
- <jackson.version>2.8.4</jackson.version>
+ <jersey.version>2.25.1</jersey.version>
+ <jersey.swagger.version>1.5.16</jersey.swagger.version>
+ <jackson.version>2.9.1</jackson.version>
<http.client.version>4.5.2</http.client.version>
<http.core.version>4.4.4</http.core.version>
<logback.version>1.2.3</logback.version>
@@ -55,7 +55,7 @@
<hibernate.core.version>5.2.10.Final</hibernate.core.version>
<hibernate.commons.annotations.version>5.0.1.Final</hibernate.commons.annotations.version>
<commons.io.version>2.5</commons.io.version>
- <guava.version>16.0.1</guava.version>
+ <guava.version>23.0</guava.version>
<nexusproxy>https://nexus.onap.org</nexusproxy>
<sitePath>/content/sites/site/${project.groupId}/${project.artifactId}/${project.version}</sitePath>
@@ -73,6 +73,8 @@
<module>feature-eelf</module>
<module>feature-session-persistence</module>
<module>feature-test-transaction</module>
+ <module>api-state-management</module>
+ <module>feature-state-management</module>
<module>packages</module>
</modules>