summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJessica Wagantall <jwagantall@linuxfoundation.org>2018-08-13 23:42:40 +0000
committerRamaSubbaReddy <rama.subba.reddy.s@huawei.com>2018-10-03 12:08:26 +0530
commit11a3345cf03c2ad820fa40440dbe4c89eb963b26 (patch)
tree24a5be90c240d49d553ec79c267729c9e976e1dc
parentf9cbcb5f93fa07852ea06cffe6eac6bc09c53ce1 (diff)
Add RestConf Collector
Issue-ID: DCAEGEN2-612 1. Instantiated to support CCVPN Close Loop Use Case 2. In general, this supports data collection from all PNF or devices that supports RestConf protocol Change-Id: I6311ad618e8d68badc5423a63d7781a19dc62829 Signed-off-by: rama-huawei <rama.subba.reddy.s@huawei.com>
-rwxr-xr-xetc/DmaapConfig.json12
-rwxr-xr-xetc/collector.properties22
-rwxr-xr-xetc/establish-subscription-input-template.json5
-rwxr-xr-xetc/log4j.xml188
-rwxr-xr-xetc/passwordfile1
-rwxr-xr-xpom.xml332
-rwxr-xr-xrestconf.iml107
-rwxr-xr-xsrc/assembly/dep.xml43
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/AnyNode.java127
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/AuthType.java43
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/Constants.java38
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/DataChangeEventListener.java51
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/EventProcessor.java88
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/Format.java37
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/HttpMethod.java48
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/HttpResponse.java30
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/JsonParser.java92
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/Parameters.java52
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RestConfCollector.java66
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RestConfContext.java51
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java224
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RestapiCallNode.java600
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RetryException.java27
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicy.java58
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicyStore.java53
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/XmlJsonUtil.java412
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/XmlParser.java178
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParser.java107
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisher.java75
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersBuilder.java63
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCache.java110
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/EventPublisher.java35
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/PublisherConfig.java95
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/VavrUtils.java48
-rwxr-xr-xsrc/main/scripts/docker_entry.sh6
-rwxr-xr-xsrc/main/scripts/restConfCollector.sh100
-rwxr-xr-xsrc/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParserTest.java111
-rwxr-xr-xsrc/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisherTest.java77
-rwxr-xr-xsrc/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCacheTest.java92
-rwxr-xr-xsrc/test/java/org/onap/dcae/collectors/restconf/restconftest/AnyNodeTest.java62
-rwxr-xr-xsrc/test/java/org/onap/dcae/collectors/restconf/restconftest/RestConfProcTest.java75
-rwxr-xr-xsrc/test/java/org/onap/dcae/collectors/restconf/restconftest/SseResource.java69
-rwxr-xr-xsrc/test/java/org/onap/dcae/collectors/restconf/restconftest/TestRestConfCollector.java66
-rwxr-xr-xsrc/test/resources/RestConfEvent.json26
-rwxr-xr-xsrc/test/resources/testParseDMaaPCredentialsGen2.json21
-rwxr-xr-xsrc/test/resources/testParseDMaaPCredentialsLegacy.json26
-rwxr-xr-xsrc/test/resources/testParseDMaaPGen2.json12
-rwxr-xr-xsrc/test/resources/testParseDMaaPLegacy.json21
48 files changed, 4282 insertions, 0 deletions
diff --git a/etc/DmaapConfig.json b/etc/DmaapConfig.json
new file mode 100755
index 0000000..5176cea
--- /dev/null
+++ b/etc/DmaapConfig.json
@@ -0,0 +1,12 @@
+{
+ "channels": [
+ {
+ "name": "route_failure",
+ "cambria.topic": "unauthenticated.DCAE_RESTCONF_COL_OUTPUT",
+ "class": "HpCambriaOutputStream",
+ "stripHpId": "true",
+ "type": "out",
+ "cambria.hosts": "onap-message-router"
+ }
+ ]
+}
diff --git a/etc/collector.properties b/etc/collector.properties
new file mode 100755
index 0000000..a013578
--- /dev/null
+++ b/etc/collector.properties
@@ -0,0 +1,22 @@
+###############################################################################
+##
+## Collector config
+##
+## - Default values are shown as commented settings.
+##
+###############################################################################
+## Processing
+collector.dmaapfile=./etc/DmaapConfig.json
+###############################################################################
+##
+## Tomcat control
+##
+#tomcat.maxthreads=(tomcat default, which is usually 200)
+# list all restconf collector parameters
+templateFileName=./etc/establish-subscription-input-template.json
+restapiUrl=10.0.4.1:8080;10.0.4.2:8080
+httpMethod=post
+responsePrefix=restapi-result
+skipSending=false
+sseConnectURL=http://10.0.4.1:8080/RestConfServer/rest/ssevents; http://10.0.4.2:8080/RestConfServer/rest/ssevents
+format=json
diff --git a/etc/establish-subscription-input-template.json b/etc/establish-subscription-input-template.json
new file mode 100755
index 0000000..c47ba01
--- /dev/null
+++ b/etc/establish-subscription-input-template.json
@@ -0,0 +1,5 @@
+{
+ "ietf-subscribed-notification:input": {
+ "encoding": "encoding-json"
+ }
+}
diff --git a/etc/log4j.xml b/etc/log4j.xml
new file mode 100755
index 0000000..3e3b132
--- /dev/null
+++ b/etc/log4j.xml
@@ -0,0 +1,188 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2018. Lorem ipsum dolor sit amet, consectetur adipiscing elit.
+ ~ Morbi non lorem porttitor neque feugiat blandit. Ut vitae ipsum eget quam lacinia accumsan.
+ ~ Etiam sed turpis ac ipsum condimentum fringilla. Maecenas magna.
+ ~ Proin dapibus sapien vel ante. Aliquam erat volutpat. Pellentesque sagittis ligula eget metus.
+ ~ Vestibulum commodo. Ut rhoncus gravida arcu.
+ -->
+
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="threshold" value="INFO"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="[%d{ABSOLUTE}][%-5p][%-10t]%m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+ <param name="threshold" value="INFO"/>
+ <param name="File" value="logs/collector.log"/>
+ <param name="MaxFileSize" value="32MB"/>
+ <param name="MaxBackupIndex" value="20"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <!-- param name="ConversionPattern" value="[%d{ABSOLUTE}][%-5p][%-10t][%-5c][%4L]%m%n" / -->
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%-10t][%-5c]%m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="IFILE" class="org.apache.log4j.RollingFileAppender">
+ <param name="threshold" value="INFO"/>
+ <param name="File" value="logs/input.log"/>
+ <param name="MaxFileSize" value="32MB"/>
+ <param name="MaxBackupIndex" value="10"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <!-- param name="ConversionPattern" value="[%d{ABSOLUTE}][%-5p][%-10t][%-5c][%4L]%m%n" / -->
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%-10t][%-5c]%m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="OFILE" class="org.apache.log4j.RollingFileAppender">
+ <param name="threshold" value="INFO"/>
+ <param name="File" value="logs/output.log"/>
+ <param name="MaxFileSize" value="32MB"/>
+ <param name="MaxBackupIndex" value="10"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <!-- param name="ConversionPattern" value="[%d{ABSOLUTE}][%-5p][%-10t][%-5c][%4L]%m%n" / -->
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%-10t][%-5c]%m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="EFILE" class="org.apache.log4j.RollingFileAppender">
+ <param name="threshold" value="INFO"/>
+ <param name="File" value="logs/error.log"/>
+ <param name="MaxFileSize" value="32MB"/>
+ <param name="MaxBackupIndex" value="5"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <!-- param name="ConversionPattern" value="[%d{ABSOLUTE}][%-5p][%-10t][%-5c][%4L]%m%n" / -->
+ <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%-10t][%-5c]%m%n"/>
+ </layout>
+ </appender>
+
+ <!--
+ ECOMP logging setup
+
+ NOTES:
+
+ 1. files are written to "./logs/<filename>". You must setup the environment
+ so that ./logs is a symlink to the correct location according to the ECOMP
+ log standard. For example, "/opt/logs/DCAE/highlandParkVcScope". If that's
+ not possible, change the File setting in each appender appropriately.
+ -->
+
+ <appender name="ECOMP_AUDIT" class="org.apache.log4j.RollingFileAppender">
+ <param name="threshold" value="DEBUG"/>
+ <param name="File" value="./logs/ecomp/audit.log"/>
+ <param name="MaxFileSize" value="128MB"/>
+ <param name="MaxBackupIndex" value="20"/>
+ <layout class="com.att.nsa.logging.log4j.EcompLayout">
+ <param name="ConversionPattern" value="ECOMP_AUDIT"/>
+ </layout>
+ </appender>
+
+ <appender name="ECOMP_METRIC" class="org.apache.log4j.RollingFileAppender">
+ <param name="threshold" value="INFO"/>
+ <param name="File" value="./logs/ecomp/metric.log"/>
+ <param name="MaxFileSize" value="128MB"/>
+ <param name="MaxBackupIndex" value="10"/>
+ <layout class="com.att.nsa.logging.log4j.EcompLayout">
+ <param name="ConversionPattern" value="ECOMP_METRIC"/>
+ </layout>
+ </appender>
+
+ <appender name="ECOMP_ERROR" class="org.apache.log4j.RollingFileAppender">
+ <param name="threshold" value="WARN"/> <!-- only WARN and ERROR are allowed in this log -->
+ <param name="File" value="./logs/ecomp/error.log"/>
+ <param name="MaxFileSize" value="128MB"/>
+ <param name="MaxBackupIndex" value="10"/>
+ <layout class="com.att.nsa.logging.log4j.EcompLayout">
+ <param name="ConversionPattern" value="ECOMP_ERROR"/>
+ </layout>
+ </appender>
+
+ <appender name="ECOMP_DEBUG" class="org.apache.log4j.RollingFileAppender">
+ <param name="threshold" value="DEBUG"/>
+ <param name="File" value="./logs/ecomp/debug.log"/>
+ <param name="MaxFileSize" value="128MB"/>
+ <param name="MaxBackupIndex" value="20"/>
+ <layout class="com.att.nsa.logging.log4j.EcompLayout">
+ <param name="ConversionPattern" value="ECOMP_DEBUG"/>
+ </layout>
+ </appender>
+
+ <logger name="org.onap.restconf.common.input" additivity="false">
+ <level value="INFO"/>
+ <appender-ref ref="IFILE"/>
+ </logger>
+
+ <logger name="org.onap.restconf.common.output" additivity="false">
+ <level value="INFO"/>
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="OFILE"/>
+ </logger>
+
+ <logger name="org.onap.restconf.common.error" additivity="false">
+ <level value="DEBUG"/>
+ <appender-ref ref="EFILE"/>
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ECOMP_ERROR"/>
+ </logger>
+
+ <!--
+ The ECOMP logging standard has four specific classes of logging that are
+ unrelated to subsystem logger names. If you want them activated, uncomment
+ this block.
+-->
+ <logger name="com.att.ecomp.audit" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ECOMP_AUDIT"/>
+ </logger>
+
+ <logger name="com.att.ecomp.metrics" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="ECOMP_METRIC"/>
+ </logger>
+
+ <logger name="com.att.ecomp.error" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="ECOMP_ERROR"/>
+ </logger>
+
+ <logger name="com.att.ecomp.debug" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="ECOMP_DEBUG"/>
+ </logger>
+
+ <logger name="org.onap.dcae.commonFunction.EventPublisher" additivity="false">
+ <level value="debug"/>
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+
+ <logger name="com.att.nsa.apiClient.http.HttpClient" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="FILE"/>
+ <appender-ref ref="CONSOLE"/>
+ </logger>
+
+ <logger name="com.att.nsa.cambria.client.impl.CambriaSimplerBatchPublisher" additivity="false">
+ <level value="info"/>
+ <appender-ref ref="FILE"/>
+ <appender-ref ref="CONSOLE"/>
+ </logger>
+
+ <root>
+ <level value="DEBUG"/>
+ <appender-ref ref="FILE"/>
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ECOMP_AUDIT"/>
+ <appender-ref ref="ECOMP_DEBUG"/>
+ <appender-ref ref="ECOMP_ERROR"/>
+ </root>
+
+</log4j:configuration>
diff --git a/etc/passwordfile b/etc/passwordfile
new file mode 100755
index 0000000..702a4cb
--- /dev/null
+++ b/etc/passwordfile
@@ -0,0 +1 @@
+collector
diff --git a/pom.xml b/pom.xml
new file mode 100755
index 0000000..f0efa8f
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,332 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.onap.oparent</groupId>
+ <artifactId>oparent</artifactId>
+ <version>1.1.0</version>
+ <relativePath/>
+ </parent>
+
+ <groupId>org.onap.dcaegen2.collectors.restconf</groupId>
+ <artifactId>restconfcollector</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+
+ <name>dcaegen2-collectors-restconf</name>
+ <description>RestConfCollector</description>
+
+ <properties>
+ <!-- PROJECT SETTINGS -->
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <java.version>8</java.version>
+ <docker.image.name>restconfcollector</docker.image.name>
+
+ <!-- PLUGIN SETTINGS -->
+ <dependency.locations.enabled>false</dependency.locations.enabled>
+
+ </properties>
+
+ <build>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.7.0</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>3.1.0</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.1.0</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>3.0.1</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <version>2.9</version>
+ </plugin>
+ <plugin>
+ <groupId>com.spotify</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>1.1.1</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>${java.version}</source>
+ <target>${java.version}</target>
+ <showWarnings>true</showWarnings>
+ <showDeprecation>true</showDeprecation>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <configuration>
+ <excludeResources>true</excludeResources>
+ </configuration>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>jar-no-fork</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+ </manifest>
+ <manifestEntries>
+ <Implementation-Build-Version>${project.version}</Implementation-Build-Version>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/assembly/dep.xml</descriptor>
+ </descriptors>
+ <attach>false</attach>
+ <appendAssemblyId>false</appendAssemblyId>
+ <updateOnly>true</updateOnly>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <!-- minimize console output messages -->
+ <quiet>true</quiet>
+ <verbose>false</verbose>
+ <useStandardDocletOptions>false</useStandardDocletOptions>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <phase>site</phase>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>attach-javadoc</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <reporting>
+ <plugins>
+ <plugin>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+ <reportSets>
+ <reportSet>
+ <reports>
+ <report>dependencies</report>
+ <report>license</report>
+ </reports>
+ </reportSet>
+ </reportSets>
+ </plugin>
+
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <failOnError>false</failOnError>
+ <doclet>org.umlgraph.doclet.UmlGraphDoc</doclet>
+ <docletArtifact>
+ <groupId>org.umlgraph</groupId>
+ <artifactId>umlgraph</artifactId>
+ <version>5.6</version>
+ </docletArtifact>
+ <additionalparam>-views</additionalparam>
+ <useStandardDocletOptions>true</useStandardDocletOptions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </reporting>
+
+ <dependencies>
+
+ <!-- JSON RELATED -->
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>json-schema-validator</artifactId>
+ <version>2.2.6</version>
+ </dependency>
+ <dependency>
+ <groupId>com.github.fge</groupId>
+ <artifactId>json-schema-core</artifactId>
+ <version>1.2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.json</groupId>
+ <artifactId>json</artifactId>
+ <version>20160810</version>
+ </dependency>
+
+ <!-- REST API RELATED -->
+ <dependency>
+ <groupId>com.att.nsa</groupId>
+ <artifactId>nsaServerLibrary</artifactId>
+ <version>1.0.10</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.8.11</version>
+ </dependency>
+
+ <!-- LOGGING RELATED -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.21</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>apache-log4j-extras</artifactId>
+ <version>1.2.17</version>
+ </dependency>
+
+ <!-- MISCELLANEOUS -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>3.2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>1.10</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.mail</groupId>
+ <artifactId>mail</artifactId>
+ <version>1.4.7</version>
+ </dependency>
+ <dependency>
+ <groupId>io.vavr</groupId>
+ <artifactId>vavr</artifactId>
+ <version>0.9.2</version>
+ </dependency>
+ <!-- TESTING -->
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.18.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
+ <version>1.3.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-client</artifactId>
+ <version>2.27</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.containers</groupId>
+ <artifactId>jersey-container-grizzly2-http</artifactId>
+ <version>2.27</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>jersey-hk2</artifactId>
+ <version>2.27</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-sse</artifactId>
+ <version>2.27</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey.contribs.jersey-oauth</groupId>
+ <artifactId>oauth-client</artifactId>
+ <version>1.19.1</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey.contribs.jersey-oauth</groupId>
+ <artifactId>oauth-signature</artifactId>
+ <version>1.19.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <version>1.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>3.8.0</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <repositories>
+ <repository>
+ <id>external-repository</id>
+ <url>https://oss.sonatype.org/content/repositories</url>
+ </repository>
+ </repositories>
+</project> \ No newline at end of file
diff --git a/restconf.iml b/restconf.iml
new file mode 100755
index 0000000..0267434
--- /dev/null
+++ b/restconf.iml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
+ <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
+ <output url="file://$MODULE_DIR$/target/classes" />
+ <output-test url="file://$MODULE_DIR$/target/test-classes" />
+ <content url="file://$MODULE_DIR$">
+ <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
+ <sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
+ <sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
+ <sourceFolder url="file://$MODULE_DIR$/src/test/resources" type="java-test-resource" />
+ <excludeFolder url="file://$MODULE_DIR$/target" />
+ </content>
+ <orderEntry type="inheritedJdk" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ <orderEntry type="library" name="Maven: com.googlecode.json-simple:json-simple:1.1.1" level="project" />
+ <orderEntry type="library" name="Maven: junit:junit:4.10" level="project" />
+ <orderEntry type="library" name="Maven: com.github.fge:json-schema-validator:2.2.6" level="project" />
+ <orderEntry type="library" name="Maven: com.google.code.findbugs:jsr305:3.0.0" level="project" />
+ <orderEntry type="library" name="Maven: joda-time:joda-time:2.3" level="project" />
+ <orderEntry type="library" name="Maven: com.googlecode.libphonenumber:libphonenumber:6.2" level="project" />
+ <orderEntry type="library" name="Maven: javax.mail:mailapi:1.4.3" level="project" />
+ <orderEntry type="library" name="Maven: net.sf.jopt-simple:jopt-simple:4.6" level="project" />
+ <orderEntry type="library" name="Maven: com.github.fge:json-schema-core:1.2.5" level="project" />
+ <orderEntry type="library" name="Maven: com.github.fge:uri-template:0.9" level="project" />
+ <orderEntry type="library" name="Maven: com.github.fge:msg-simple:1.1" level="project" />
+ <orderEntry type="library" name="Maven: com.github.fge:btf:1.2" level="project" />
+ <orderEntry type="library" name="Maven: com.github.fge:jackson-coreutils:1.8" level="project" />
+ <orderEntry type="library" name="Maven: org.mozilla:rhino:1.7R4" level="project" />
+ <orderEntry type="library" name="Maven: com.google.code.gson:gson:2.3.1" level="project" />
+ <orderEntry type="library" name="Maven: org.json:json:20160810" level="project" />
+ <orderEntry type="library" name="Maven: com.att.nsa:nsaServerLibrary:1.0.10" level="project" />
+ <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.7.5" level="project" />
+ <orderEntry type="library" name="Maven: com.att.nsa:saToolkit:0.0.1" level="project" />
+ <orderEntry type="library" name="Maven: jline:jline:2.12.1" level="project" />
+ <orderEntry type="library" name="Maven: com.att.nsa:cambriaClient:0.0.1" level="project" />
+ <orderEntry type="library" name="Maven: com.att.nsa:saClientLibrary:0.0.1" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.httpcomponents:httpclient:4.5" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.httpcomponents:httpcore:4.4.1" level="project" />
+ <orderEntry type="library" name="Maven: commons-codec:commons-codec:1.9" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.httpcomponents:httpclient-cache:4.5" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.velocity:velocity:1.7" level="project" />
+ <orderEntry type="library" name="Maven: commons-io:commons-io:2.4" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.tomcat:tomcat-catalina:8.0.36" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.tomcat:tomcat-servlet-api:8.0.36" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.tomcat:tomcat-jsp-api:8.0.36" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.tomcat:tomcat-el-api:8.0.36" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.tomcat:tomcat-juli:8.0.36" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.tomcat:tomcat-annotations-api:8.0.36" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.tomcat:tomcat-api:8.0.36" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.tomcat:tomcat-jni:8.0.36" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.tomcat:tomcat-coyote:8.0.36" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.tomcat:tomcat-util-scan:8.0.36" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.tomcat:tomcat-util:8.0.36" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.tomcat.embed:tomcat-embed-core:8.0.36" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-databind:2.8.11" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-annotations:2.8.0" level="project" />
+ <orderEntry type="library" name="Maven: com.fasterxml.jackson.core:jackson-core:2.8.10" level="project" />
+ <orderEntry type="library" name="Maven: org.slf4j:slf4j-log4j12:1.7.21" level="project" />
+ <orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />
+ <orderEntry type="library" name="Maven: log4j:apache-log4j-extras:1.2.17" level="project" />
+ <orderEntry type="library" name="Maven: com.google.guava:guava:18.0" level="project" />
+ <orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.2" level="project" />
+ <orderEntry type="library" name="Maven: commons-configuration:commons-configuration:1.10" level="project" />
+ <orderEntry type="library" name="Maven: commons-lang:commons-lang:2.6" level="project" />
+ <orderEntry type="library" name="Maven: commons-logging:commons-logging:1.1.1" level="project" />
+ <orderEntry type="library" name="Maven: javax.mail:mail:1.4.7" level="project" />
+ <orderEntry type="library" name="Maven: javax.activation:activation:1.1" level="project" />
+ <orderEntry type="library" name="Maven: io.vavr:vavr:0.9.2" level="project" />
+ <orderEntry type="library" name="Maven: io.vavr:vavr-match:0.9.2" level="project" />
+ <orderEntry type="library" scope="TEST" name="Maven: org.mockito:mockito-core:2.18.0" level="project" />
+ <orderEntry type="library" scope="TEST" name="Maven: net.bytebuddy:byte-buddy:1.8.3" level="project" />
+ <orderEntry type="library" scope="TEST" name="Maven: net.bytebuddy:byte-buddy-agent:1.8.3" level="project" />
+ <orderEntry type="library" scope="TEST" name="Maven: org.objenesis:objenesis:2.6" level="project" />
+ <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.7" level="project" />
+ <orderEntry type="library" name="Maven: org.codehaus.jettison:jettison:1.3.7" level="project" />
+ <orderEntry type="library" name="Maven: stax:stax-api:1.0.1" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.jersey.core:jersey-client:2.27" level="project" />
+ <orderEntry type="library" name="Maven: javax.ws.rs:javax.ws.rs-api:2.1" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.jersey.core:jersey-common:2.27" level="project" />
+ <orderEntry type="library" name="Maven: javax.annotation:javax.annotation-api:1.2" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.hk2:osgi-resource-locator:1.0.1" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.hk2.external:javax.inject:2.5.0-b42" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.jersey.containers:jersey-container-grizzly2-http:2.27" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.grizzly:grizzly-http-server:2.4.0" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.grizzly:grizzly-http:2.4.0" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.grizzly:grizzly-framework:2.4.0" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.jersey.core:jersey-server:2.27" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.jersey.media:jersey-media-jaxb:2.27" level="project" />
+ <orderEntry type="library" name="Maven: javax.validation:validation-api:1.1.0.Final" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.jersey.inject:jersey-hk2:2.27" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.hk2:hk2-locator:2.5.0-b42" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.hk2.external:aopalliance-repackaged:2.5.0-b42" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.hk2:hk2-api:2.5.0-b42" level="project" />
+ <orderEntry type="library" name="Maven: javax.inject:javax.inject:1" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.hk2:hk2-utils:2.5.0-b42" level="project" />
+ <orderEntry type="library" name="Maven: org.javassist:javassist:3.22.0-CR2" level="project" />
+ <orderEntry type="library" name="Maven: org.glassfish.jersey.media:jersey-media-sse:2.27" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.jersey.contribs.jersey-oauth:oauth-client:1.19.1" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.jersey:jersey-client:1.19.1" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.jersey.contribs.jersey-oauth:oauth-signature:1.19.1" level="project" />
+ <orderEntry type="library" name="Maven: com.sun.jersey:jersey-core:1.19.1" level="project" />
+ <orderEntry type="library" name="Maven: javax.ws.rs:jsr311-api:1.1.1" level="project" />
+ <orderEntry type="library" name="Maven: org.hamcrest:hamcrest-library:1.3" level="project" />
+ <orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
+ <orderEntry type="library" scope="TEST" name="Maven: org.assertj:assertj-core:3.8.0" level="project" />
+ </component>
+</module> \ No newline at end of file
diff --git a/src/assembly/dep.xml b/src/assembly/dep.xml
new file mode 100755
index 0000000..472d966
--- /dev/null
+++ b/src/assembly/dep.xml
@@ -0,0 +1,43 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/2.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd">
+
+ <id>bundle</id>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <formats>
+ <format>dir</format>
+ </formats>
+
+ <fileSets>
+ <fileSet>
+ <directory>src/main/scripts</directory>
+ <outputDirectory>bin</outputDirectory>
+ <includes>
+ <include>**/*.*</include>
+ </includes>
+ <fileMode>0755</fileMode>
+ <lineEnding>unix</lineEnding>
+ </fileSet>
+ <fileSet>
+ <directory>etc</directory>
+ <outputDirectory>etc</outputDirectory>
+ </fileSet>
+ <fileSet>
+ <directory>src/main/resources</directory>
+ <includes>
+ <include>**/*.conf</include>
+ </includes>
+ <outputDirectory>etc</outputDirectory>
+ </fileSet>
+
+ </fileSets>
+
+ <dependencySets>
+ <dependencySet>
+ <scope>runtime</scope>
+ <useProjectArtifact>true</useProjectArtifact>
+ <unpack>false</unpack>
+ <outputDirectory>lib</outputDirectory>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/AnyNode.java b/src/main/java/org/onap/dcae/collectors/restconf/common/AnyNode.java
new file mode 100755
index 0000000..044a9cf
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/AnyNode.java
@@ -0,0 +1,127 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import io.vavr.collection.List;
+import io.vavr.collection.Set;
+import io.vavr.control.Option;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.util.stream.StreamSupport;
+
+import static io.vavr.API.Set;
+
+public class AnyNode {
+ private Object obj;
+
+ private AnyNode(Object object) {
+ this.obj = object;
+ }
+
+ public static AnyNode fromString(String content) {
+ return new AnyNode(new JSONObject(content));
+ }
+
+ /**
+ * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject.
+ *
+ * @return key set of underlying objects
+ */
+ public Set<String> keys() {
+ return Set(asJsonObject().keySet().toArray(new String[]{}));
+ }
+
+ /**
+ * Returns value associated with specified key wrapped with AnyValue object. It is assumed that this is of type
+ * org.json.JSONObject.
+ *
+ * @param key for querying value from jsonobject
+ * @return value associated with specified key
+ */
+ public AnyNode get(String key) {
+ return new AnyNode(asJsonObject().get(key));
+ }
+
+ /**
+ * Returns string representation of this. If it happens to have null, the value is treated as
+ * org.json.JSONObject.NULL and "null" string is returned then.
+ *
+ * @return string representation of this
+ */
+ public String toString() {
+ return this.obj.toString();
+ }
+
+ /**
+ * Returns optional of object under specified key, wrapped with AnyNode object.
+ * If underlying object is not of type org.json.JSONObject
+ * or underlying object has no given key
+ * or given key is null
+ * then Optional.empty will be returned.
+ *
+ * @param key for querying value from AnyNode object
+ * @return optional of object under specified key
+ */
+ public Option<AnyNode> getAsOption(String key) {
+ try {
+ AnyNode value = get(key);
+ if (value.toString().equals("null")) {
+ return Option.none();
+ }
+ return Option.some(value);
+ } catch (JSONException ex) {
+ return Option.none();
+ }
+ }
+
+ /**
+ * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that
+ * underlying object is of type org.json.JSONObject.
+ *
+ * @return converts underlying object to map representation
+ */
+ public List<AnyNode> toList() {
+ return List.ofAll(StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new));
+ }
+
+ /**
+ * Checks if specified key is present in this. It is assumed that this is of type JSONObject.
+ *
+ * @param key is used to check presence in anynode object
+ * @return true if specified key is present in this
+ */
+ public boolean has(String key) {
+ return !getAsOption(key).isEmpty();
+ }
+
+ /**
+ * Returns as JSONObject.
+ *
+ * @return jsonobject
+ */
+ private JSONObject asJsonObject() {
+ return (JSONObject) this.obj;
+ }
+
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/AuthType.java b/src/main/java/org/onap/dcae/collectors/restconf/common/AuthType.java
new file mode 100755
index 0000000..47ae8ac
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/AuthType.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.restconf.common;
+
+public enum AuthType {
+ NONE, BASIC, DIGEST, OAUTH, Unspecified;
+
+ public static AuthType fromString(String s) {
+ if ("basic".equalsIgnoreCase(s)) {
+ return BASIC;
+ }
+ if ("digest".equalsIgnoreCase(s)) {
+ return DIGEST;
+ }
+ if ("oauth".equalsIgnoreCase(s)) {
+ return OAUTH;
+ }
+ if ("none".equalsIgnoreCase(s)) {
+ return NONE;
+ }
+ if ("unspecified".equalsIgnoreCase(s)) {
+ return Unspecified;
+ }
+ throw new IllegalArgumentException("Invalid value for format: " + s);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/Constants.java b/src/main/java/org/onap/dcae/collectors/restconf/common/Constants.java
new file mode 100755
index 0000000..4845bfc
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/Constants.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+public class Constants {
+ public static final String KDEFAULT_TEMP_FILENAME = "templateFileName";
+ public static final String KSETTING_REST_API_URL = "restapiUrl";
+ public static final String KSETTING_HTTP_METHOD = "httpMethod";
+ public static final String KSETTING_RESP_PREFIX = "responsePrefix";
+ public static final String KSETTING_SKIP_SENDING = "skipSending";
+ public static final String KSETTING_FORMAT = "format";
+ public static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";
+ public static final String[] KDEFAULT_DMAAPCONFIGS = new String[]{"./etc/DmaapConfig.json"};
+ public static final String KSETTING_SSE_CONNECT_URL = "sseConnectURL";
+ public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
+ public static final String RESPONSE_CODE = "restapi-result.response-code";
+ public static final String OUTPUT_IDENTIFIER = "restapi-result.ietf-subscribed-notifications:output.identifier";
+ public static final String RESPONSE_CODE_200 = "200";
+ public static final String KCONFIG = "c";
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/DataChangeEventListener.java b/src/main/java/org/onap/dcae/collectors/restconf/common/DataChangeEventListener.java
new file mode 100755
index 0000000..97ee623
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/DataChangeEventListener.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import org.glassfish.jersey.media.sse.EventListener;
+import org.glassfish.jersey.media.sse.InboundEvent;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataChangeEventListener implements EventListener {
+ private static final Logger log = LoggerFactory.getLogger(DataChangeEventListener.class);
+ private RestConfContext ctx;
+
+ public DataChangeEventListener(RestConfContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void onEvent(InboundEvent event) {
+ JSONArray jsonArrayMod;
+ log.info("On SSE Event is received");
+ String s = event.readData();
+ JSONObject jsonObj = new JSONObject(s);
+ jsonArrayMod = new JSONArray().put(jsonObj);
+ try {
+ RestConfProc.handleEvents(jsonArrayMod);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/EventProcessor.java b/src/main/java/org/onap/dcae/collectors/restconf/common/EventProcessor.java
new file mode 100755
index 0000000..3409f9c
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/EventProcessor.java
@@ -0,0 +1,88 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+
+import org.json.JSONObject;
+import org.onap.dcae.collectors.restconf.common.event.publishing.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class EventProcessor implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
+
+ static Map<String, String[]> streamidHash = new HashMap<>();
+ public JSONObject event;
+ private EventPublisher eventPublisher;
+
+ public EventProcessor(EventPublisher eventPublisher) {
+ this.eventPublisher = eventPublisher;
+ streamidHash = parseStreamIdToStreamHashMapping(new RestConfProc().streamID);
+ }
+
+ private Map<String, String[]> parseStreamIdToStreamHashMapping(String streamId) {
+ Map<String, String[]> streamidHash = new HashMap<>();
+ String[] list = streamId.split("\\|");
+ for (String aList : list) {
+ String domain = aList.split("=")[0];
+ String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(",");
+ streamidHash.put(domain, streamIdList);
+ }
+ return streamidHash;
+ }
+
+ @Override
+ public void run() {
+ try {
+
+ while (true) {
+ event = RestConfProc.fProcessingInputQueue.take();
+ // As long as the producer is running we remove elements from
+ // the queue.
+ log.info("QueueSize:" + RestConfProc.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " +
+ event);
+ String[] streamIdList = streamidHash.get("route");
+ log.debug("streamIdList:" + Arrays.toString(streamIdList));
+
+ if (streamIdList.length == 0) {
+ log.error("No StreamID defined for publish - Message dropped" + event);
+ } else {
+ sendEventsToStreams(streamIdList);
+ }
+ log.debug("Event published" + event);
+ }
+ } catch (Exception e) {
+ log.error("EventProcessor InterruptedException" + e.getMessage());
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void sendEventsToStreams(String[] streamIdList) {
+ for (String aStreamIdList : streamIdList) {
+ log.info("Invoking publisher for streamId:" + aStreamIdList);
+ eventPublisher.sendEvent(event, aStreamIdList);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/Format.java b/src/main/java/org/onap/dcae/collectors/restconf/common/Format.java
new file mode 100755
index 0000000..94344d6
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/Format.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.restconf.common;
+
+public enum Format {
+ JSON, XML, NONE;
+
+ public static Format fromString(String s) {
+ if ("json".equalsIgnoreCase(s)) {
+ return JSON;
+ }
+ if ("xml".equalsIgnoreCase(s)) {
+ return XML;
+ }
+ if ("none".equalsIgnoreCase(s)) {
+ return NONE;
+ }
+ throw new IllegalArgumentException("Invalid value for format: " + s);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/HttpMethod.java b/src/main/java/org/onap/dcae/collectors/restconf/common/HttpMethod.java
new file mode 100755
index 0000000..d85d5f5
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/HttpMethod.java
@@ -0,0 +1,48 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+
+package org.onap.dcae.collectors.restconf.common;
+
+public enum HttpMethod {
+ GET, POST, PUT, DELETE, PATCH;
+
+ public static HttpMethod fromString(String s) {
+ if (s == null) {
+ return null;
+ }
+ if (s.equalsIgnoreCase("get")) {
+ return GET;
+ }
+ if (s.equalsIgnoreCase("post")) {
+ return POST;
+ }
+ if (s.equalsIgnoreCase("put")) {
+ return PUT;
+ }
+ if (s.equalsIgnoreCase("delete")) {
+ return DELETE;
+ }
+ if (s.equalsIgnoreCase("patch")) {
+ return PATCH;
+ }
+ throw new IllegalArgumentException("Invalid value for HTTP Method: " + s);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/HttpResponse.java b/src/main/java/org/onap/dcae/collectors/restconf/common/HttpResponse.java
new file mode 100755
index 0000000..e1b97da
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/HttpResponse.java
@@ -0,0 +1,30 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+public class HttpResponse {
+ public int code;
+ public String message;
+ public String body;
+ public MultivaluedMap<String, String> headers;
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/JsonParser.java b/src/main/java/org/onap/dcae/collectors/restconf/common/JsonParser.java
new file mode 100755
index 0000000..f29bbc3
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/JsonParser.java
@@ -0,0 +1,92 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class JsonParser {
+
+ private static final Logger log = LoggerFactory.getLogger(JsonParser.class);
+
+ private JsonParser() {
+ // Preventing instantiation of the same.
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Map<String, String> convertToProperties(String s)
+ throws Exception {
+
+ checkNotNull(s, "Input should not be null.");
+
+ try {
+ JSONObject json = new JSONObject(s);
+ Map<String, Object> wm = new HashMap<>();
+ Iterator<String> ii = json.keys();
+ while (ii.hasNext()) {
+ String key1 = ii.next();
+ wm.put(key1, json.get(key1));
+ }
+
+ Map<String, String> mm = new HashMap<>();
+
+ while (!wm.isEmpty())
+ for (String key : new ArrayList<>(wm.keySet())) {
+ Object o = wm.get(key);
+ wm.remove(key);
+
+ if (o instanceof Boolean || o instanceof Number || o instanceof String) {
+ mm.put(key, o.toString());
+
+ log.info("Added property: {} : {}", key, o.toString());
+ } else if (o instanceof JSONObject) {
+ JSONObject jo = (JSONObject) o;
+ Iterator<String> i = jo.keys();
+ while (i.hasNext()) {
+ String key1 = i.next();
+ wm.put(key + "." + key1, jo.get(key1));
+ }
+ } else if (o instanceof JSONArray) {
+ JSONArray ja = (JSONArray) o;
+ mm.put(key + "_length", String.valueOf(ja.length()));
+
+ log.info("Added property: {}_length: {}", key, String.valueOf(ja.length()));
+
+ for (int i = 0; i < ja.length(); i++)
+ wm.put(key + '[' + i + ']', ja.get(i));
+ }
+ }
+ return mm;
+ } catch (JSONException e) {
+ throw new Exception("Unable to convert JSON to properties" + e.getLocalizedMessage(), e);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/Parameters.java b/src/main/java/org/onap/dcae/collectors/restconf/common/Parameters.java
new file mode 100755
index 0000000..02fd68b
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/Parameters.java
@@ -0,0 +1,52 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import java.util.Set;
+
+public class Parameters {
+ public String templateFileName;
+ public String restapiUrl;
+ public String restapiUser;
+ public String restapiPassword;
+ public Format format;
+ public String contentType;
+ public HttpMethod httpMethod;
+ public String responsePrefix;
+ public Set<String> listNameList;
+ public boolean skipSending;
+ public boolean convertResponse;
+ public String keyStoreFileName;
+ public String keyStorePassword;
+ public String trustStoreFileName;
+ public String trustStorePassword;
+ public boolean ssl;
+ public String customHttpHeaders;
+ public String partner;
+ public Boolean dumpHeaders;
+ public String requestBody;
+ public String oAuthConsumerKey;
+ public String oAuthConsumerSecret;
+ public String oAuthSignatureMethod;
+ public String oAuthVersion;
+ public AuthType authtype;
+ public Boolean returnRequestPayload;
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfCollector.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfCollector.java
new file mode 100755
index 0000000..754c73d
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfCollector.java
@@ -0,0 +1,66 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import com.att.nsa.cmdLine.NsaCommandLineUtil;
+import com.att.nsa.drumlin.service.framework.DrumlinServlet;
+import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Map;
+
+public class RestConfCollector {
+
+ public static final Logger eplog = LoggerFactory.getLogger("org.onap.restconf.common.error");
+
+ public static void main(String[] args) {
+ try {
+ final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
+ final String config = NsaCommandLineUtil.getSetting(argMap, Constants.KCONFIG, "collector.properties");
+ final URL settingStream = DrumlinServlet.findStream(config, RestConfCollector.class);
+
+ final nvReadableStack settings = new nvReadableStack();
+ settings.push(new nvPropertiesFile(settingStream));
+ settings.push(new nvReadableTable(argMap));
+
+ RestConfProc restConfProc = new RestConfProc(settings);
+ Map<String, String> paraMap = restConfProc.getParaMap();
+ String restApiURL = paraMap.get(Constants.KSETTING_REST_API_URL);
+ String sseEventsURL = paraMap.get(Constants.KSETTING_SSE_CONNECT_URL);
+ String[] listRestApiURL = restApiURL.split(";");
+ String[] listSseEventsURL = sseEventsURL.split(";");
+ for (int i = 0; i < listRestApiURL.length; i++) {
+ paraMap.put(Constants.KSETTING_REST_API_URL, "http://" + listRestApiURL[i] +
+ "/RestConfServer/rest/operations/establish-subscription");
+ paraMap.put(Constants.KSETTING_SSE_CONNECT_URL, listSseEventsURL[i]);
+ restConfProc.establishSubscription(paraMap, restConfProc.getCtx());
+ }
+
+ } catch (Exception e) {
+ RestConfCollector.eplog.error("Fatal error during application startup", e);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfContext.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfContext.java
new file mode 100755
index 0000000..4cb16ae
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfContext.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import java.util.HashMap;
+import java.util.Set;
+
+public class RestConfContext {
+ private HashMap<String, String> attributes;
+
+ public RestConfContext() {
+ attributes = new HashMap<>();
+ }
+
+ public String getAttribute(String name) {
+ return attributes.getOrDefault(name, null);
+ }
+
+ public void setAttribute(String name, String value) {
+ if (value == null) {
+ if (attributes.containsKey(name)) {
+ attributes.remove(name);
+ }
+ } else {
+ attributes.put(name, value);
+ }
+ }
+
+ public Set<String> getAttributeKeySet() {
+ return attributes.keySet();
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java
new file mode 100755
index 0000000..67ea1fb
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java
@@ -0,0 +1,224 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import org.glassfish.jersey.media.sse.EventSource;
+import org.glassfish.jersey.media.sse.SseFeature;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.onap.dcae.collectors.restconf.common.event.publishing.DMaaPConfigurationParser;
+import org.onap.dcae.collectors.restconf.common.event.publishing.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+public class RestConfProc {
+
+ private static final Logger log = LoggerFactory.getLogger(RestConfProc.class);
+
+ public static String format;
+
+ private static RestConfContext ctx = new RestConfContext();
+
+ private static final Logger oplog = LoggerFactory.getLogger("org.onap.restconf.common.output");
+
+ private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
+
+ private final Map<String, String> paraMap = new HashMap<>();
+ private static String cambriaConfigFile;
+
+ public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
+
+ public static String streamID;
+ private ExecutorService executor = Executors.newCachedThreadPool();
+
+ public RestConfProc() {
+ }
+
+ private void parseInputParameters(rrNvReadable settings) {
+ String tempFileName;
+ String restApiUrl;
+ String httpMetthod;
+ String respPrefix;
+ String skipSending;
+ String sseConnectUrl;
+ String[] currentConfigFile;
+
+ currentConfigFile = settings.getStrings(Constants.KSETTING_DMAAPCONFIGS, Constants.KDEFAULT_DMAAPCONFIGS);
+ cambriaConfigFile = currentConfigFile[0];
+
+ tempFileName = settings.getString(Constants.KDEFAULT_TEMP_FILENAME, null);
+ restApiUrl = settings.getString(Constants.KSETTING_REST_API_URL, null);
+ httpMetthod = settings.getString(Constants.KSETTING_HTTP_METHOD, null);
+ respPrefix = settings.getString(Constants.KSETTING_RESP_PREFIX, null);
+ skipSending = settings.getString(Constants.KSETTING_SKIP_SENDING, null);
+ sseConnectUrl = settings.getString(Constants.KSETTING_SSE_CONNECT_URL, null);
+ format = settings.getString(Constants.KSETTING_FORMAT, null);
+ streamID = "route=route_failure";
+
+ paraMap.put(Constants.KDEFAULT_TEMP_FILENAME, tempFileName);
+ paraMap.put(Constants.KSETTING_REST_API_URL, restApiUrl);
+ paraMap.put(Constants.KSETTING_HTTP_METHOD, httpMetthod);
+ paraMap.put(Constants.KSETTING_RESP_PREFIX, respPrefix);
+ paraMap.put(Constants.KSETTING_SKIP_SENDING, skipSending);
+ paraMap.put(Constants.KSETTING_SSE_CONNECT_URL, sseConnectUrl);
+ paraMap.put(Constants.KSETTING_FORMAT, format);
+
+ ctx.setAttribute("prop.encoding-json", "encoding-json");
+ ctx.setAttribute("restapi-result.response-code", "200");
+ ctx.setAttribute("restapi-result.ietf-subscribed-notifications:output.identifier", "100");
+ }
+
+ public RestConfProc(rrNvReadable settings) {
+
+ parseInputParameters(settings);
+
+ fProcessingInputQueue = new LinkedBlockingQueue<>(Constants.KDEFAULT_MAXQUEUEDEVENTS);
+
+ EventProcessor ep = new EventProcessor(EventPublisher.createPublisher(oplog,
+ DMaaPConfigurationParser
+ .parseToDomainMapping(Paths.get(cambriaConfigFile))
+ .get()));
+ ExecutorService executor = Executors.newFixedThreadPool(20);
+ for (int i = 0; i < 20; ++i) {
+ executor.execute(ep);
+ }
+ }
+
+ /**
+ * To establish a subscription with controller by sending HTTP request
+ *
+ * @param paramMap holds the input configuration
+ * @param ctx restconf context
+ * @throws Exception exception
+ */
+ public void establishSubscription(Map<String, String> paramMap, RestConfContext ctx) throws Exception {
+
+ RestapiCallNode restApiCallNode = new RestapiCallNode();
+
+ restApiCallNode.sendRequest(paramMap, ctx, null);
+
+ establishPersistentConnection(paramMap, ctx);
+ }
+
+ /**
+ * To establish persistent connection after receiving successful subscription response from controller
+ *
+ * @param paramMap holds the input configuration
+ * @param ctx restconf context
+ */
+ public void establishPersistentConnection(Map<String, String> paramMap, RestConfContext ctx) {
+
+ // check whether response is ok
+ if (ctx.getAttribute(Constants.RESPONSE_CODE).equals(Constants.RESPONSE_CODE_200)) {
+
+ String id = ctx.getAttribute(Constants.OUTPUT_IDENTIFIER);
+
+ String url = paramMap.get(Constants.KSETTING_SSE_CONNECT_URL);
+
+ PersistentConnection connection = new PersistentConnection(url, ctx);
+ runnableInfo.put(id, connection);
+ executor.execute(connection);
+ } else {
+ // error response is already updated in ctx
+ log.info("Failed to subscribe");
+ }
+ }
+
+ /**
+ * Get input parameter map
+ *
+ * @return input parameters map
+ */
+ public Map<String, String> getParaMap() {
+ return paraMap;
+ }
+
+
+ /**
+ * Get restConf context which has information about message encoding type
+ *
+ * @return restconf context
+ */
+ public RestConfContext getCtx() {
+ return ctx;
+ }
+
+ public class PersistentConnection implements Runnable {
+ private String url;
+ private RestConfContext ctx;
+ private volatile boolean running = true;
+
+ public PersistentConnection(String url, RestConfContext ctx) {
+ this.url = url;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void run() {
+ Client client = ClientBuilder.newBuilder()
+ .register(SseFeature.class).build();
+ WebTarget target = client.target(url);
+ EventSource eventSource = EventSource.target(target).build();
+ eventSource.register(new DataChangeEventListener(ctx));
+ eventSource.open();
+ log.info("Connected to SSE source");
+ while (running) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ log.info("Exception: " + ie.getMessage());
+ }
+ }
+ eventSource.close();
+ log.info("Closed connection to SSE source");
+ }
+ }
+
+ /**
+ * To process the array of events which are received from controller
+ *
+ * @param a JSONArray
+ * @throws Exception exception
+ */
+ public static void handleEvents(JSONArray a) throws Exception {
+ for (int i = 0; i < a.length(); i++) {
+ if (!fProcessingInputQueue.offer(a.getJSONObject(i))) {
+ throw new Exception();
+ }
+ }
+ log.debug("RestConfCollector.handleEvents:EVENTS has been published successfully!");
+ }
+}
+
+
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestapiCallNode.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestapiCallNode.java
new file mode 100755
index 0000000..a324e87
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RestapiCallNode.java
@@ -0,0 +1,600 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
+import com.sun.jersey.api.client.filter.HTTPDigestAuthFilter;
+import com.sun.jersey.client.urlconnection.HTTPSProperties;
+import com.sun.jersey.oauth.client.OAuthClientFilter;
+import com.sun.jersey.oauth.signature.OAuthParameters;
+import com.sun.jersey.oauth.signature.OAuthSecrets;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.EntityTag;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.UriBuilder;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class RestapiCallNode {
+ private static final Logger log = LoggerFactory.getLogger(RestapiCallNode.class);
+
+ public void sendRequest(Map<String, String> paramMap, RestConfContext ctx, Integer retryCount) throws Exception {
+ RetryPolicy retryPolicy = null;
+ HttpResponse r = new HttpResponse();
+ try {
+ Parameters p = getParameters(paramMap);
+ String pp = p.responsePrefix != null ? p.responsePrefix + '.' : "";
+ String req = null;
+ if (p.templateFileName != null) {
+ String reqTemplate = readFile(p.templateFileName);
+ req = buildXmlJsonRequest(ctx, reqTemplate, p.format);
+ } else if (p.requestBody != null) {
+ req = p.requestBody;
+ }
+
+ r = sendHttpRequest(req, p);
+ setResponseStatus(ctx, p.responsePrefix, r);
+
+ if (p.dumpHeaders && r.headers != null) {
+ for (Map.Entry<String, List<String>> a : r.headers.entrySet()) {
+ ctx.setAttribute(pp + "header." + a.getKey(), StringUtils.join(a.getValue(), ","));
+ }
+ }
+
+ if (p.returnRequestPayload && req != null) {
+ ctx.setAttribute(pp + "httpRequest", req);
+ }
+
+ if (r.body != null && r.body.trim().length() > 0) {
+ ctx.setAttribute(pp + "httpResponse", r.body);
+
+ if (p.convertResponse) {
+ Map<String, String> mm = null;
+ if (p.format == Format.XML) {
+ mm = XmlParser.convertToProperties(r.body, p.listNameList);
+ } else if (p.format == Format.JSON) {
+ mm = JsonParser.convertToProperties(r.body);
+ }
+
+ if (mm != null) {
+ for (Map.Entry<String, String> entry : mm.entrySet())
+ ctx.setAttribute(pp + entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ } catch (Exception e) {
+ boolean shouldRetry = false;
+ if (e.getCause().getCause() instanceof SocketException) {
+ shouldRetry = true;
+ }
+
+ log.error("Error sending the request: " + e.getMessage(), e);
+ String prefix = parseParam(paramMap, "responsePrefix", false, null);
+ if (retryPolicy == null || shouldRetry == false) {
+ setFailureResponseStatus(ctx, prefix, e.getMessage(), r);
+ } else {
+ if (retryCount == null) {
+ retryCount = 0;
+ }
+ String retryMessage = retryCount + " attempts were made out of " + retryPolicy.getMaximumRetries() +
+ " maximum retries.";
+ log.debug(retryMessage);
+ try {
+ retryCount = retryCount + 1;
+ if (retryCount < retryPolicy.getMaximumRetries() + 1) {
+ URI uri = new URI(paramMap.get("restapiUrl"));
+ String hostname = uri.getHost();
+ String retryString = retryPolicy.getNextHostName(uri.toString());
+ URI uriTwo = new URI(retryString);
+ URI retryUri = UriBuilder.fromUri(uri).host(uriTwo.getHost()).port(uriTwo.getPort()).scheme(
+ uriTwo.getScheme()).build();
+ paramMap.put("restapiUrl", retryUri.toString());
+ log.debug("URL was set to {}", retryUri.toString());
+ log.debug("Failed to communicate with host {}. Request will be re-attempted using the host {}.",
+ hostname, retryString);
+ log.debug("This is retry attempt {} out of {}", retryCount, retryPolicy.getMaximumRetries());
+ sendRequest(paramMap, ctx, retryCount);
+ } else {
+ log.debug("Maximum retries reached, calling setFailureResponseStatus.");
+ setFailureResponseStatus(ctx, prefix, e.getMessage(), r);
+ }
+ } catch (Exception ex) {
+ log.error("Could not attempt retry.", ex);
+ String retryErrorMessage =
+ "Retry attempt has failed. No further retry shall be attempted, calling " +
+ "setFailureResponseStatus.";
+ setFailureResponseStatus(ctx, prefix, retryErrorMessage, r);
+ }
+ }
+ }
+
+ if (r != null && r.code >= 300) {
+ throw new Exception(String.valueOf(r.code) + ": " + r.message);
+ }
+ }
+
+ protected Parameters getParameters(Map<String, String> paramMap) throws Exception {
+ Parameters p = new Parameters();
+ p.templateFileName = parseParam(paramMap, "templateFileName", false, null);
+ p.requestBody = parseParam(paramMap, "requestBody", false, null);
+ p.restapiUrl = parseParam(paramMap, "restapiUrl", true, null);
+ validateUrl(p.restapiUrl);
+ p.restapiUser = parseParam(paramMap, "restapiUser", false, null);
+ p.restapiPassword = parseParam(paramMap, "restapiPassword", false, null);
+ p.oAuthConsumerKey = parseParam(paramMap, "oAuthConsumerKey", false, null);
+ p.oAuthConsumerSecret = parseParam(paramMap, "oAuthConsumerSecret", false, null);
+ p.oAuthSignatureMethod = parseParam(paramMap, "oAuthSignatureMethod", false, null);
+ p.oAuthVersion = parseParam(paramMap, "oAuthVersion", false, null);
+ p.contentType = parseParam(paramMap, "contentType", false, null);
+ p.format = Format.fromString(parseParam(paramMap, "format", false, "json"));
+ p.authtype = AuthType.fromString(parseParam(paramMap, "authType", false, "unspecified"));
+ p.httpMethod = HttpMethod.fromString(parseParam(paramMap, "httpMethod", false, "post"));
+ p.responsePrefix = parseParam(paramMap, "responsePrefix", false, null);
+ p.listNameList = getListNameList(paramMap);
+ String skipSendingStr = paramMap.get("skipSending");
+ p.skipSending = "true".equalsIgnoreCase(skipSendingStr);
+ p.convertResponse = Boolean.valueOf(parseParam(paramMap, "convertResponse", false, "true"));
+ p.trustStoreFileName = parseParam(paramMap, "trustStoreFileName", false, null);
+ p.trustStorePassword = parseParam(paramMap, "trustStorePassword", false, null);
+ p.keyStoreFileName = parseParam(paramMap, "keyStoreFileName", false, null);
+ p.keyStorePassword = parseParam(paramMap, "keyStorePassword", false, null);
+ p.ssl = p.trustStoreFileName != null && p.trustStorePassword != null && p.keyStoreFileName != null &&
+ p.keyStorePassword != null;
+ p.customHttpHeaders = parseParam(paramMap, "customHttpHeaders", false, null);
+ p.partner = parseParam(paramMap, "partner", false, null);
+ p.dumpHeaders = Boolean.valueOf(parseParam(paramMap, "dumpHeaders", false, null));
+ p.returnRequestPayload = Boolean.valueOf(parseParam(paramMap, "returnRequestPayload", false, null));
+ return p;
+ }
+
+ private void validateUrl(String restapiUrl) throws Exception {
+ try {
+ URI.create(restapiUrl);
+ } catch (IllegalArgumentException e) {
+ throw new Exception("Invalid input of url " + e.getLocalizedMessage(), e);
+ }
+ }
+
+ protected Set<String> getListNameList(Map<String, String> paramMap) {
+ Set<String> ll = new HashSet<>();
+ for (Map.Entry<String, String> entry : paramMap.entrySet())
+ if (entry.getKey().startsWith("listName")) {
+ ll.add(entry.getValue());
+ }
+ return ll;
+ }
+
+ protected String parseParam(Map<String, String> paramMap, String name, boolean required, String def)
+ throws Exception {
+ String s = paramMap.get(name);
+
+ if (s == null || s.trim().length() == 0) {
+ if (!required) {
+ return def;
+ }
+ throw new Exception("Parameter " + name + " is required in RestapiCallNode");
+ }
+
+ s = s.trim();
+ StringBuilder value = new StringBuilder();
+ int i = 0;
+ int i1 = s.indexOf('%');
+ while (i1 >= 0) {
+ int i2 = s.indexOf('%', i1 + 1);
+ if (i2 < 0) {
+ break;
+ }
+
+ String varName = s.substring(i1 + 1, i2);
+ String varValue = System.getenv(varName);
+ if (varValue == null) {
+ varValue = "%" + varName + "%";
+ }
+
+ value.append(s.substring(i, i1));
+ value.append(varValue);
+
+ i = i2 + 1;
+ i1 = s.indexOf('%', i);
+ }
+ value.append(s.substring(i));
+
+ log.info("Parameter {}: [{}]", name, value);
+ return value.toString();
+ }
+
+ protected String buildXmlJsonRequest(RestConfContext ctx, String template, Format format) throws Exception {
+ log.info("Building {} started", format);
+ long t1 = System.currentTimeMillis();
+
+ template = expandRepeats(ctx, template, 1);
+
+ Map<String, String> mm = new HashMap<>();
+ for (String s : ctx.getAttributeKeySet())
+ mm.put(s, ctx.getAttribute(s));
+ StringBuilder ss = new StringBuilder();
+ int i = 0;
+ while (i < template.length()) {
+ int i1 = template.indexOf("${", i);
+ if (i1 < 0) {
+ ss.append(template.substring(i));
+ break;
+ }
+
+ int i2 = template.indexOf('}', i1 + 2);
+ if (i2 < 0) {
+ throw new Exception("Template error: Matching } not found");
+ }
+
+ String var1 = template.substring(i1 + 2, i2);
+ String value1 = format == Format.XML ? XmlJsonUtil.getXml(mm, var1) : XmlJsonUtil.getJson(mm, var1);
+ // log.info(" " + var1 + ": " + value1);
+ if (value1 == null || value1.trim().length() == 0) {
+ // delete the whole element (line)
+ int i3 = template.lastIndexOf('\n', i1);
+ if (i3 < 0) {
+ i3 = 0;
+ }
+ int i4 = template.indexOf('\n', i1);
+ if (i4 < 0) {
+ i4 = template.length();
+ }
+
+ if (i < i3) {
+ ss.append(template.substring(i, i3));
+ }
+ i = i4;
+ } else {
+ ss.append(template.substring(i, i1)).append(value1);
+ i = i2 + 1;
+ }
+ }
+
+ String req = format == Format.XML
+ ? XmlJsonUtil.removeEmptyStructXml(ss.toString()) : XmlJsonUtil.removeEmptyStructJson(ss.toString());
+
+ if (format == Format.JSON) {
+ req = XmlJsonUtil.removeLastCommaJson(req);
+ }
+
+ long t2 = System.currentTimeMillis();
+ log.info("Building {} completed. Time: {}", format, (t2 - t1));
+
+ return req;
+ }
+
+ protected String expandRepeats(RestConfContext ctx, String template, int level) throws Exception {
+ StringBuilder newTemplate = new StringBuilder();
+ int k = 0;
+ while (k < template.length()) {
+ int i1 = template.indexOf("${repeat:", k);
+ if (i1 < 0) {
+ newTemplate.append(template.substring(k));
+ break;
+ }
+
+ int i2 = template.indexOf(':', i1 + 9);
+ if (i2 < 0) {
+ throw new Exception(
+ "Template error: Context variable name followed by : is required after repeat");
+ }
+
+ // Find the closing }, store in i3
+ int nn = 1;
+ int i3 = -1;
+ int i = i2;
+ while (nn > 0 && i < template.length()) {
+ i3 = template.indexOf('}', i);
+ if (i3 < 0) {
+ throw new Exception("Template error: Matching } not found");
+ }
+ int i32 = template.indexOf('{', i);
+ if (i32 >= 0 && i32 < i3) {
+ nn++;
+ i = i32 + 1;
+ } else {
+ nn--;
+ i = i3 + 1;
+ }
+ }
+
+ String var1 = template.substring(i1 + 9, i2);
+ String value1 = ctx.getAttribute(var1);
+ log.info(" {}:{}", var1, value1);
+ int n = 0;
+ try {
+ n = Integer.parseInt(value1);
+ } catch (NumberFormatException e) {
+ log.info("value1 not set or not a number, n will remain set at zero");
+ }
+
+ newTemplate.append(template.substring(k, i1));
+
+ String rpt = template.substring(i2 + 1, i3);
+
+ for (int ii = 0; ii < n; ii++) {
+ String ss = rpt.replaceAll("\\[\\$\\{" + level + "\\}\\]", "[" + ii + "]");
+ if (ii == n - 1 && ss.trim().endsWith(",")) {
+ int i4 = ss.lastIndexOf(',');
+ if (i4 > 0) {
+ ss = ss.substring(0, i4) + ss.substring(i4 + 1);
+ }
+ }
+ newTemplate.append(ss);
+ }
+
+ k = i3 + 1;
+ }
+
+ if (k == 0) {
+ return newTemplate.toString();
+ }
+
+ return expandRepeats(ctx, newTemplate.toString(), level + 1);
+ }
+
+ protected String readFile(String fileName) throws Exception {
+ try {
+ byte[] encoded = Files.readAllBytes(Paths.get(fileName));
+ return new String(encoded, "UTF-8");
+ } catch (IOException | SecurityException e) {
+ throw new Exception("Unable to read file " + fileName + e.getLocalizedMessage(), e);
+ }
+ }
+
+ protected Client addAuthType(Client client, Parameters p) throws Exception {
+ if (p.authtype == AuthType.Unspecified) {
+ if (p.restapiUser != null && p.restapiPassword != null) {
+ client.addFilter(new HTTPBasicAuthFilter(p.restapiUser, p.restapiPassword));
+ } else if (p.oAuthConsumerKey != null && p.oAuthConsumerSecret != null
+ && p.oAuthSignatureMethod != null) {
+ OAuthParameters params = new OAuthParameters()
+ .signatureMethod(p.oAuthSignatureMethod)
+ .consumerKey(p.oAuthConsumerKey)
+ .version(p.oAuthVersion);
+
+ OAuthSecrets secrets = new OAuthSecrets()
+ .consumerSecret(p.oAuthConsumerSecret);
+ client.addFilter(new OAuthClientFilter(client.getProviders(), params, secrets));
+ }
+ } else {
+ if (p.authtype == AuthType.DIGEST) {
+ if (p.restapiUser != null && p.restapiPassword != null) {
+ client.addFilter(new HTTPDigestAuthFilter(p.restapiUser, p.restapiPassword));
+ } else {
+ throw new Exception("oAUTH authentication type selected but all restapiUser and restapiPassword " +
+ "parameters doesn't exist", new Throwable());
+ }
+ } else if (p.authtype == AuthType.BASIC) {
+ if (p.restapiUser != null && p.restapiPassword != null) {
+ client.addFilter(new HTTPBasicAuthFilter(p.restapiUser, p.restapiPassword));
+ } else {
+ throw new Exception("oAUTH authentication type selected but all restapiUser and restapiPassword " +
+ "parameters doesn't exist", new Throwable());
+ }
+ } else if (p.authtype == AuthType.OAUTH) {
+ if (p.oAuthConsumerKey != null && p.oAuthConsumerSecret != null && p.oAuthSignatureMethod != null) {
+ OAuthParameters params = new OAuthParameters()
+ .signatureMethod(p.oAuthSignatureMethod)
+ .consumerKey(p.oAuthConsumerKey)
+ .version(p.oAuthVersion);
+
+ OAuthSecrets secrets = new OAuthSecrets()
+ .consumerSecret(p.oAuthConsumerSecret);
+ client.addFilter(new OAuthClientFilter(client.getProviders(), params, secrets));
+ } else {
+ throw new Exception("oAUTH authentication type selected but all oAuthConsumerKey, oAuthConsumerSecret " +
+ "and oAuthSignatureMethod parameters doesn't exist", new Throwable());
+ }
+ }
+ }
+ return client;
+ }
+
+ protected HttpResponse sendHttpRequest(String request, Parameters p) throws Exception {
+
+ ClientConfig config = new DefaultClientConfig();
+ SSLContext ssl = null;
+ if (p.ssl && p.restapiUrl.startsWith("https")) {
+ ssl = createSSLContext(p);
+ }
+ if (ssl != null) {
+ HostnameVerifier hostnameVerifier = (hostname, session) -> true;
+
+ config.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES,
+ new HTTPSProperties(hostnameVerifier, ssl));
+ }
+
+ logProperties(config.getProperties());
+
+ Client client = Client.create(config);
+ client.setConnectTimeout(5000);
+ WebResource webResource = addAuthType(client, p).resource(p.restapiUrl);
+
+ log.info("Sending request:");
+ log.info(request);
+ long t1 = System.currentTimeMillis();
+
+ HttpResponse r = new HttpResponse();
+ r.code = 200;
+
+ if (!p.skipSending) {
+ String tt = p.format == Format.XML ? "application/xml" : "application/json";
+ String tt1 = tt + ";charset=UTF-8";
+ if (p.contentType != null) {
+ tt = p.contentType;
+ tt1 = p.contentType;
+ }
+
+ WebResource.Builder webResourceBuilder = webResource.accept(tt).type(tt1);
+ if (p.format == Format.NONE) {
+ webResourceBuilder = webResource.header("", "");
+ }
+
+ if (p.customHttpHeaders != null && p.customHttpHeaders.length() > 0) {
+ String[] keyValuePairs = p.customHttpHeaders.split(",");
+ for (String singlePair : keyValuePairs) {
+ int equalPosition = singlePair.indexOf('=');
+ webResourceBuilder.header(singlePair.substring(0, equalPosition),
+ singlePair.substring(equalPosition + 1, singlePair.length()));
+ }
+ }
+
+ webResourceBuilder.header("X-ECOMP-RequestID", org.slf4j.MDC.get("X-ECOMP-RequestID"));
+
+ ClientResponse response;
+
+ try {
+ response = webResourceBuilder.method(p.httpMethod.toString(), ClientResponse.class, request);
+ } catch (UniformInterfaceException | ClientHandlerException e) {
+ throw new Exception("Exception while sending http request to client "
+ + e.getLocalizedMessage(), e);
+ }
+
+ r.code = response.getStatus();
+ r.headers = response.getHeaders();
+ EntityTag etag = response.getEntityTag();
+ if (etag != null) {
+ r.message = etag.getValue();
+ }
+ if (response.hasEntity() && r.code != 204) {
+ r.body = response.getEntity(String.class);
+ }
+ }
+
+ long t2 = System.currentTimeMillis();
+ log.info("Response received. Time: {}", (t2 - t1));
+ log.info("HTTP response code: {}", r.code);
+ log.info("HTTP response message: {}", r.message);
+ logHeaders(r.headers);
+ log.info("HTTP response: {}", r.body);
+
+ return r;
+ }
+
+ protected void setFailureResponseStatus(RestConfContext ctx, String prefix, String errorMessage,
+ HttpResponse resp) {
+ resp.code = 500;
+ resp.message = errorMessage;
+ String pp = prefix != null ? prefix + '.' : "";
+ ctx.setAttribute(pp + "response-code", String.valueOf(resp.code));
+ ctx.setAttribute(pp + "response-message", resp.message);
+ }
+
+ protected void setResponseStatus(RestConfContext ctx, String prefix, HttpResponse r) {
+ String pp = prefix != null ? prefix + '.' : "";
+ ctx.setAttribute(pp + "response-code", String.valueOf(r.code));
+ ctx.setAttribute(pp + "response-message", r.message);
+ }
+
+ protected SSLContext createSSLContext(Parameters p) {
+ try (FileInputStream in = new FileInputStream(p.keyStoreFileName)) {
+ System.setProperty("jsse.enableSNIExtension", "false");
+ System.setProperty("javax.net.ssl.trustStore", p.trustStoreFileName);
+ System.setProperty("javax.net.ssl.trustStorePassword", p.trustStorePassword);
+
+ HttpsURLConnection.setDefaultHostnameVerifier((string, ssls) -> true);
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ KeyStore ks = KeyStore.getInstance("PKCS12");
+ char[] pwd = p.keyStorePassword.toCharArray();
+ ks.load(in, pwd);
+ kmf.init(ks, pwd);
+
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(kmf.getKeyManagers(), null, null);
+ return ctx;
+ } catch (Exception e) {
+ log.error("Error creating SSLContext: {}", e.getMessage(), e);
+ }
+ return null;
+ }
+
+ protected void logProperties(Map<String, Object> mm) {
+ List<String> ll = new ArrayList<>();
+ for (Object o : mm.keySet())
+ ll.add((String) o);
+ Collections.sort(ll);
+
+ log.info("Properties:");
+ for (String name : ll)
+ log.info("--- {}:{}", name, String.valueOf(mm.get(name)));
+ }
+
+ protected void logHeaders(MultivaluedMap<String, String> mm) {
+ log.info("HTTP response headers:");
+
+ if (mm == null) {
+ return;
+ }
+
+ List<String> ll = new ArrayList<>();
+ for (Object o : mm.keySet())
+ ll.add((String) o);
+ Collections.sort(ll);
+
+ for (String name : ll)
+ log.info("--- {}:{}", name, String.valueOf(mm.get(name)));
+ }
+
+ private static class FileParam {
+
+ public String fileName;
+ public String url;
+ public String user;
+ public String password;
+ public HttpMethod httpMethod;
+ public String responsePrefix;
+ public boolean skipSending;
+ public String oAuthConsumerKey;
+ public String oAuthConsumerSecret;
+ public String oAuthSignatureMethod;
+ public String oAuthVersion;
+ public AuthType authtype;
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RetryException.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryException.java
new file mode 100755
index 0000000..91c0a9c
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryException.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+public class RetryException extends Exception {
+ public RetryException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicy.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicy.java
new file mode 100755
index 0000000..7bc0759
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicy.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+public class RetryPolicy {
+ private String[] hostnames;
+ private Integer maximumRetries;
+
+ public Integer getMaximumRetries() {
+ return maximumRetries;
+ }
+
+ public String getNextHostName(String uri) throws RetryException {
+ Integer position = null;
+
+ for (int i = 0; i < hostnames.length; i++) {
+ if (uri.contains(hostnames[i])) {
+ position = i;
+ break;
+ }
+ }
+
+ if (position == null) {
+ throw new RetryException("No match found for the provided uri[" + uri + "] " +
+ "so the next host name could not be retreived");
+ }
+ position++;
+
+ if (position > hostnames.length - 1) {
+ position = 0;
+ }
+ return hostnames[position];
+ }
+
+ public RetryPolicy(String[] hostnames, Integer maximumRetries) {
+ this.hostnames = hostnames;
+ this.maximumRetries = maximumRetries;
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicyStore.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicyStore.java
new file mode 100755
index 0000000..10b0e00
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicyStore.java
@@ -0,0 +1,53 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+
+public class RetryPolicyStore {
+ private static final Logger log = LoggerFactory.getLogger(RetryPolicyStore.class);
+
+ HashMap<String, RetryPolicy> retryPolicies;
+ public String proxyServers;
+
+ public String getProxyServers() {
+ return proxyServers;
+ }
+
+ public void setProxyServers(String admServers) {
+ this.proxyServers = admServers;
+ String[] adminServersArray = admServers.split(",");
+ RetryPolicy adminPortalRetry = new RetryPolicy(adminServersArray, adminServersArray.length);
+ retryPolicies.put("dme2proxy", adminPortalRetry);
+ }
+
+ public RetryPolicyStore() {
+ retryPolicies = new HashMap<>();
+ }
+
+ public RetryPolicy getRetryPolicy(String policyName) {
+ return (this.retryPolicies.get(policyName));
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/XmlJsonUtil.java b/src/main/java/org/onap/dcae/collectors/restconf/common/XmlJsonUtil.java
new file mode 100755
index 0000000..382ef4d
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/XmlJsonUtil.java
@@ -0,0 +1,412 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class XmlJsonUtil {
+
+ private static final Logger log = LoggerFactory.getLogger(XmlJsonUtil.class);
+
+ private XmlJsonUtil() {
+ // Preventing instantiation of the same.
+ }
+
+ public static String getXml(Map<String, String> varmap, String var) {
+ boolean escape = true;
+ if (var.startsWith("'")) {
+ var = var.substring(1);
+ escape = false;
+ }
+
+ Object o = createStructure(varmap, var);
+ return generateXml(o, 0, escape);
+ }
+
+ public static String getJson(Map<String, String> varmap, String var) {
+ boolean escape = true;
+ if (var.startsWith("'")) {
+ var = var.substring(1);
+ escape = false;
+ }
+
+ boolean quotes = true;
+ if (var.startsWith("\"")) {
+ var = var.substring(1);
+ quotes = false;
+ }
+
+ Object o = createStructure(varmap, var);
+ return generateJson(o, escape, quotes);
+ }
+
+ private static Object createStructure(Map<String, String> flatmap, String var) {
+ if (flatmap.containsKey(var)) {
+ if (var.endsWith("_length") || var.endsWith("].key")) {
+ return null;
+ }
+ return flatmap.get(var);
+ }
+
+ Map<String, Object> mm = new HashMap<>();
+ for (String k : flatmap.keySet())
+ if (k.startsWith(var + ".")) {
+ int i1 = k.indexOf('.', var.length() + 1);
+ int i2 = k.indexOf('[', var.length() + 1);
+ int i3 = k.length();
+ if (i1 > 0 && i1 < i3) {
+ i3 = i1;
+ }
+ if (i2 > 0 && i2 < i3) {
+ i3 = i2;
+ }
+ String k1 = k.substring(var.length() + 1, i3);
+ String var1 = k.substring(0, i3);
+ if (!mm.containsKey(k1)) {
+ Object str = createStructure(flatmap, var1);
+ if (str != null && (!(str instanceof String) || ((String) str).trim().length() > 0)) {
+ mm.put(k1, str);
+ }
+ }
+ }
+ if (!mm.isEmpty()) {
+ return mm;
+ }
+
+ boolean arrayFound = false;
+ for (String k : flatmap.keySet())
+ if (k.startsWith(var + "[")) {
+ arrayFound = true;
+ break;
+ }
+
+ if (arrayFound) {
+ List<Object> ll = new ArrayList<>();
+
+ int length = Integer.MAX_VALUE;
+ String lengthStr = flatmap.get(var + "_length");
+ if (lengthStr != null) {
+ try {
+ length = Integer.parseInt(lengthStr);
+ } catch (Exception e) {
+ log.warn("Invalid number for {}_length:{}", var, lengthStr, e);
+ }
+ }
+
+ for (int i = 0; i < length; i++) {
+ Object v = createStructure(flatmap, var + '[' + i + ']');
+ if (v == null) {
+ break;
+ }
+ ll.add(v);
+ }
+
+ if (!ll.isEmpty()) {
+ return ll;
+ }
+ }
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static String generateXml(Object o, int indent, boolean escape) {
+ if (o == null) {
+ return null;
+ }
+
+ if (o instanceof String) {
+ return escape ? escapeXml((String) o) : (String) o;
+ }
+ ;
+
+ if (o instanceof Map) {
+ StringBuilder ss = new StringBuilder();
+ Map<String, Object> mm = (Map<String, Object>) o;
+ for (Map.Entry<String, Object> entry : mm.entrySet()) {
+ Object v = entry.getValue();
+ String key = entry.getKey();
+ if (v instanceof String) {
+ String s = escape ? escapeXml((String) v) : (String) v;
+ ss.append(pad(indent)).append('<').append(key).append('>');
+ ss.append(s);
+ ss.append("</").append(key).append('>').append('\n');
+ } else if (v instanceof Map) {
+ ss.append(pad(indent)).append('<').append(key).append('>').append('\n');
+ ss.append(generateXml(v, indent + 1, escape));
+ ss.append(pad(indent)).append("</").append(key).append('>').append('\n');
+ } else if (v instanceof List) {
+ List<Object> ll = (List<Object>) v;
+ for (Object o1 : ll) {
+ ss.append(pad(indent)).append('<').append(key).append('>').append('\n');
+ ss.append(generateXml(o1, indent + 1, escape));
+ ss.append(pad(indent)).append("</").append(key).append('>').append('\n');
+ }
+ }
+ }
+ return ss.toString();
+ }
+
+ return null;
+ }
+
+ private static String generateJson(Object o, boolean escape, boolean quotes) {
+ if (o == null) {
+ return null;
+ }
+
+ StringBuilder ss = new StringBuilder();
+ generateJson(ss, o, 0, false, escape, quotes);
+ return ss.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void generateJson(StringBuilder ss, Object o, int indent, boolean padFirst, boolean escape, boolean quotes) {
+ if (o instanceof String) {
+ String s = escape ? escapeJson((String) o) : (String) o;
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ if (quotes) {
+ ss.append('"').append(s).append('"');
+ } else {
+ ss.append(s);
+ }
+ return;
+ }
+
+ if (o instanceof Map) {
+ Map<String, Object> mm = (Map<String, Object>) o;
+
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append("{\n");
+
+ boolean first = true;
+ for (Map.Entry<String, Object> entry : mm.entrySet()) {
+ if (!first) {
+ ss.append(",\n");
+ }
+ first = false;
+ Object v = entry.getValue();
+ String key = entry.getKey();
+ ss.append(pad(indent + 1)).append('"').append(key).append("\": ");
+ generateJson(ss, v, indent + 1, false, escape, true);
+ }
+
+ ss.append("\n");
+ ss.append(pad(indent)).append('}');
+
+ return;
+ }
+
+ if (o instanceof List) {
+ List<Object> ll = (List<Object>) o;
+
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append("[\n");
+
+ boolean first = true;
+ for (Object o1 : ll) {
+ if (!first) {
+ ss.append(",\n");
+ }
+ first = false;
+
+ generateJson(ss, o1, indent + 1, true, escape, quotes);
+ }
+
+ ss.append("\n");
+ ss.append(pad(indent)).append(']');
+ }
+ }
+
+ public static String removeLastCommaJson(String s) {
+ StringBuilder sb = new StringBuilder();
+ int k = 0;
+ int start = 0;
+ while (k < s.length()) {
+ int i11 = s.indexOf('}', k);
+ int i12 = s.indexOf(']', k);
+ int i1 = -1;
+ if (i11 < 0) {
+ i1 = i12;
+ } else if (i12 < 0) {
+ i1 = i11;
+ } else {
+ i1 = i11 < i12 ? i11 : i12;
+ }
+ if (i1 < 0) {
+ break;
+ }
+
+ int i2 = s.lastIndexOf(',', i1);
+ if (i2 < 0) {
+ k = i1 + 1;
+ continue;
+ }
+
+ String between = s.substring(i2 + 1, i1);
+ if (between.trim().length() > 0) {
+ k = i1 + 1;
+ continue;
+ }
+
+ sb.append(s.substring(start, i2));
+ start = i2 + 1;
+ k = i1 + 1;
+ }
+
+ sb.append(s.substring(start, s.length()));
+
+ return sb.toString();
+ }
+
+ public static String removeEmptyStructJson(String s) {
+ int k = 0;
+ while (k < s.length()) {
+ boolean curly = true;
+ int i11 = s.indexOf('{', k);
+ int i12 = s.indexOf('[', k);
+ int i1 = -1;
+ if (i11 < 0) {
+ i1 = i12;
+ curly = false;
+ } else if (i12 < 0) {
+ i1 = i11;
+ } else if (i11 < i12) {
+ i1 = i11;
+ } else {
+ i1 = i12;
+ curly = false;
+ }
+
+ if (i1 >= 0) {
+ int i2 = curly ? s.indexOf('}', i1) : s.indexOf(']', i1);
+ if (i2 > 0) {
+ String value = s.substring(i1 + 1, i2);
+ if (value.trim().length() == 0) {
+ int i4 = s.lastIndexOf('\n', i1);
+ if (i4 < 0) {
+ i4 = 0;
+ }
+ int i5 = s.indexOf('\n', i2);
+ if (i5 < 0) {
+ i5 = s.length();
+ }
+
+ s = s.substring(0, i4) + s.substring(i5);
+ k = 0;
+ } else {
+ k = i1 + 1;
+ }
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ return s;
+ }
+
+ public static String removeEmptyStructXml(String s) {
+ int k = 0;
+ while (k < s.length()) {
+ int i1 = s.indexOf('<', k);
+ if (i1 < 0 || i1 == s.length() - 1) {
+ break;
+ }
+
+ char c1 = s.charAt(i1 + 1);
+ if (c1 == '?' || c1 == '!') {
+ k = i1 + 2;
+ continue;
+ }
+
+ int i2 = s.indexOf('>', i1);
+ if (i2 < 0) {
+ k = i1 + 1;
+ continue;
+ }
+
+ String closingTag = "</" + s.substring(i1 + 1, i2 + 1);
+ int i3 = s.indexOf(closingTag, i2 + 1);
+ if (i3 < 0) {
+ k = i2 + 1;
+ continue;
+ }
+
+ String value = s.substring(i2 + 1, i3);
+ if (value.trim().length() > 0) {
+ k = i2 + 1;
+ continue;
+ }
+
+ int i4 = s.lastIndexOf('\n', i1);
+ if (i4 < 0) {
+ i4 = 0;
+ }
+ int i5 = s.indexOf('\n', i3);
+ if (i5 < 0) {
+ i5 = s.length();
+ }
+
+ s = s.substring(0, i4) + s.substring(i5);
+ k = 0;
+ }
+
+ return s;
+ }
+
+ private static String escapeXml(String v) {
+ String s = v.replaceAll("&", "&amp;");
+ s = s.replaceAll("<", "&lt;");
+ s = s.replaceAll("'", "&apos;");
+ s = s.replaceAll("\"", "&quot;");
+ s = s.replaceAll(">", "&gt;");
+ return s;
+ }
+
+ private static String escapeJson(String v) {
+ String s = v.replaceAll("\\\\", "\\\\\\\\");
+ s = s.replaceAll("\"", "\\\\\"");
+ return s;
+ }
+
+ private static String pad(int n) {
+ StringBuilder s = new StringBuilder();
+ for (int i = 0; i < n; i++)
+ s.append(Character.toString('\t'));
+ return s.toString();
+ }
+}
+
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/XmlParser.java b/src/main/java/org/onap/dcae/collectors/restconf/common/XmlParser.java
new file mode 100755
index 0000000..e073cd6
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/XmlParser.java
@@ -0,0 +1,178 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.DefaultHandler;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class XmlParser {
+
+ private static final Logger log = LoggerFactory.getLogger(XmlParser.class);
+
+ private XmlParser() {
+ // Preventing instantiation of the same.
+ }
+
+ public static Map<String, String> convertToProperties(String s, Set<String> listNameList)
+ throws Exception {
+
+ checkNotNull(s, "Input should not be null.");
+
+ Handler handler = new Handler(listNameList);
+ try {
+ SAXParserFactory factory = SAXParserFactory.newInstance();
+ SAXParser saxParser = factory.newSAXParser();
+ InputStream in = new ByteArrayInputStream(s.getBytes());
+ saxParser.parse(in, handler);
+ } catch (ParserConfigurationException | IOException | SAXException | NumberFormatException e) {
+ throw new Exception("Unable to convert XML to properties" + e.getLocalizedMessage(), e);
+ }
+ return handler.getProperties();
+ }
+
+ private static class Handler extends DefaultHandler {
+
+ private Set<String> listNameList;
+
+ private Map<String, String> properties = new HashMap<>();
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public Handler(Set<String> listNameList) {
+ super();
+ this.listNameList = listNameList;
+ if (this.listNameList == null) {
+ this.listNameList = new HashSet<>();
+ }
+ }
+
+ StringBuilder currentName = new StringBuilder();
+ StringBuilder currentValue = new StringBuilder();
+
+ @Override
+ public void startElement(String uri, String localName, String qName, Attributes attributes)
+ throws SAXException {
+ super.startElement(uri, localName, qName, attributes);
+
+ String name = localName;
+ if (name == null || name.trim().length() == 0) {
+ name = qName;
+ }
+ int i2 = name.indexOf(':');
+ if (i2 >= 0) {
+ name = name.substring(i2 + 1);
+ }
+
+ if (currentName.length() > 0) {
+ currentName.append(Character.toString('.'));
+ }
+ currentName.append(name);
+
+ String listName = removeIndexes(currentName.toString());
+
+ if (listNameList.contains(listName)) {
+ String n = currentName.toString() + "_length";
+ int len = getInt(properties, n);
+ properties.put(n, String.valueOf(len + 1));
+ currentName.append("[").append(len).append("]");
+ }
+ }
+
+ @Override
+ public void endElement(String uri, String localName, String qName) throws SAXException {
+ super.endElement(uri, localName, qName);
+
+ String name = localName;
+ if (name == null || name.trim().length() == 0) {
+ name = qName;
+ }
+ int i2 = name.indexOf(':');
+ if (i2 >= 0) {
+ name = name.substring(i2 + 1);
+ }
+
+ String s = currentValue.toString().trim();
+ if (s.length() > 0) {
+ properties.put(currentName.toString(), s);
+
+ log.info("Added property: {} : {}", currentName, s);
+ currentValue = new StringBuilder();
+ }
+
+ int i1 = currentName.lastIndexOf("." + name);
+ if (i1 <= 0) {
+ currentName = new StringBuilder();
+ } else {
+ currentName = new StringBuilder(currentName.substring(0, i1));
+ }
+ }
+
+ @Override
+ public void characters(char[] ch, int start, int length) throws SAXException {
+ super.characters(ch, start, length);
+
+ String value = new String(ch, start, length);
+ currentValue.append(value);
+ }
+
+ private static int getInt(Map<String, String> mm, String name) {
+ String s = mm.get(name);
+ if (s == null) {
+ return 0;
+ }
+ return Integer.parseInt(s);
+ }
+
+ private String removeIndexes(String currentName) {
+ StringBuilder b = new StringBuilder();
+ boolean add = true;
+ for (int i = 0; i < currentName.length(); i++) {
+ char c = currentName.charAt(i);
+ if (c == '[') {
+ add = false;
+ } else if (c == ']') {
+ add = true;
+ } else if (add) {
+ b.append(Character.toString(c));
+ }
+ }
+ return b.toString();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParser.java
new file mode 100755
index 0000000..04271e4
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParser.java
@@ -0,0 +1,107 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import io.vavr.collection.List;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import io.vavr.control.Try;
+import org.onap.dcae.collectors.restconf.common.AnyNode;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static io.vavr.API.List;
+import static io.vavr.API.Try;
+import static io.vavr.API.Tuple;
+import static io.vavr.API.unchecked;
+import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.f;
+
+public class DMaaPConfigurationParser {
+
+ public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) {
+ return readFromFile(configLocation)
+ .flatMap(DMaaPConfigurationParser::toJSON)
+ .flatMap(DMaaPConfigurationParser::toConfigMap);
+ }
+
+ private static Try<String> readFromFile(Path configLocation) {
+ return Try(() -> new String(Files.readAllBytes(configLocation)))
+ .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation)));
+ }
+
+ private static Try<AnyNode> toJSON(String config) {
+ return Try(() -> AnyNode.fromString(config))
+ .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config)));
+ }
+
+ private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
+ return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config))
+ .mapFailure(enhanceError(
+ f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
+ }
+
+ private static boolean usesLegacyFormat(AnyNode dMaaPConfig) {
+ return dMaaPConfig.has("channels");
+ }
+
+ private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) {
+ return root.get("channels").toList().toMap(
+ channel -> channel.get("name").toString(),
+ channel -> {
+ String destinationsStr = channel.getAsOption("cambria.url")
+ .getOrElse(channel.getAsOption("cambria.hosts").get())
+ .toString();
+ String topic = channel.get("cambria.topic").toString();
+ Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
+ Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
+ List<String> destinations = List(destinationsStr.split(","));
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
+ }
+
+ private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
+ return root.keys().toMap(
+ channelName -> channelName,
+ channelName -> {
+ AnyNode channelConfig = root.get(channelName);
+ Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString);
+ Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString);
+ URL topicURL = unchecked(
+ () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply();
+ String[] pathSegments = topicURL.getPath().substring(1).split("/");
+ String topic = pathSegments[1];
+ String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost();
+ List<String> destinations = List(destination);
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
+ }
+
+ private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword,
+ String topic, List<String> destinations) {
+ return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password)))
+ .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2))
+ .getOrElse(new PublisherConfig(destinations, topic));
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisher.java
new file mode 100755
index 0000000..36e950f
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisher.java
@@ -0,0 +1,75 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import io.vavr.control.Try;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class DMaaPEventPublisher implements EventPublisher {
+
+ private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
+ private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
+ private final DMaaPPublishersCache publishersCache;
+ private final Logger outputLogger;
+
+ DMaaPEventPublisher(DMaaPPublishersCache DMaaPPublishersCache,
+ Logger outputLogger) {
+ this.publishersCache = DMaaPPublishersCache;
+ this.outputLogger = outputLogger;
+ }
+
+ @Override
+ public void sendEvent(JSONObject event, String domain) {
+ publishersCache.getPublisher(domain)
+ .onEmpty(() ->
+ log.warn(VavrUtils.f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
+ .forEach(publisher -> sendEvent(event, domain, publisher));
+ }
+
+ private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) {
+ Try.run(() -> uncheckedSendEvent(event, domain, publisher))
+ .onFailure(exc -> closePublisher(event, domain, exc));
+ }
+
+ private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
+ throws IOException {
+ System.out.println("printing publisher information" + publisher);
+ int pendingMsgs = publisher.send("MyPartitionKey", event.toString());
+ if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
+ log.info("Pending messages count: " + pendingMsgs);
+ }
+ String infoMsg = VavrUtils.f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain);
+ log.info(infoMsg);
+ outputLogger.info(infoMsg);
+ }
+
+ private void closePublisher(JSONObject event, String domain, Throwable e) {
+ log.error(VavrUtils.f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
+ event, domain), e);
+ publishersCache.closePublisherFor(domain);
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersBuilder.java
new file mode 100755
index 0000000..2f9b3ed
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersBuilder.java
@@ -0,0 +1,63 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+import io.vavr.control.Try;
+
+import static io.vavr.API.Try;
+import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.f;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+final class DMaaPPublishersBuilder {
+
+ @SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do")
+ static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) {
+ return Try(() -> builder(config).build())
+ .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config)));
+ }
+
+ private static PublisherBuilder builder(PublisherConfig config) {
+ if (config.isSecured()) {
+ return authenticatedBuilder(config);
+ } else {
+ return unAuthenticatedBuilder(config);
+ }
+ }
+
+ private static PublisherBuilder authenticatedBuilder(PublisherConfig config) {
+ return unAuthenticatedBuilder(config)
+ .usingHttps()
+ .authenticatedByHttp(config.userName().get(), config.password().get());
+ }
+
+ private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) {
+ return new CambriaClientBuilders.PublisherBuilder()
+ .usingHosts(config.destinations().mkString(","))
+ .onTopic(config.topic())
+ .logSendFailuresAfter(5);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCache.java
new file mode 100755
index 0000000..acad96d
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCache.java
@@ -0,0 +1,110 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static io.vavr.API.Option;
+import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.f;
+
+public class DMaaPPublishersCache {
+
+ private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class);
+ private final LoadingCache<String, CambriaBatchingPublisher> publishersCache;
+ private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration;
+
+ DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) {
+ this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
+ this.publishersCache = CacheBuilder.newBuilder()
+ .removalListener(new OnPublisherRemovalListener())
+ .build(new CambriaPublishersCacheLoader());
+ }
+
+ DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader,
+ OnPublisherRemovalListener onPublisherRemovalListener,
+ Map<String, PublisherConfig> dMaaPConfiguration) {
+ this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
+ this.publishersCache = CacheBuilder.newBuilder()
+ .removalListener(onPublisherRemovalListener)
+ .build(dMaaPPublishersCacheLoader);
+ }
+
+ Option<CambriaBatchingPublisher> getPublisher(String streamID) {
+ try {
+ return Option(publishersCache.getUnchecked(streamID));
+ } catch (Exception e) {
+ log.warn("Could not create / load Cambria Publisher for streamID", e);
+ return Option.none();
+ }
+ }
+
+ void closePublisherFor(String streamId) {
+ publishersCache.invalidate(streamId);
+ }
+
+ static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> {
+
+ @Override
+ public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) {
+ CambriaBatchingPublisher publisher = notification.getValue();
+ if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull
+ try {
+ int timeout = 20;
+ TimeUnit unit = TimeUnit.SECONDS;
+ java.util.List<?> stuck = publisher.close(timeout, unit);
+ if (!stuck.isEmpty()) {
+ log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', "
+ + "%s messages were dropped", stuck.size(), timeout, unit));
+ }
+ } catch (InterruptedException | IOException e) {
+ log.error("Could not close Cambria publisher, some messages might have been dropped", e);
+ }
+ }
+ }
+ }
+
+ class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> {
+
+ @Override
+ public CambriaBatchingPublisher load(@Nonnull String domain) {
+ return dMaaPConfiguration.get()
+ .get(domain)
+ .toTry(() -> new RuntimeException(
+ f("DMaaP configuration contains no configuration for domain: '%s'", domain)))
+ .flatMap(DMaaPPublishersBuilder::buildPublisher)
+ .get();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/EventPublisher.java
new file mode 100755
index 0000000..3d339b3
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/EventPublisher.java
@@ -0,0 +1,35 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+
+import io.vavr.collection.Map;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+
+public interface EventPublisher {
+
+ static EventPublisher createPublisher(Logger outputLogger, Map<String, PublisherConfig> dMaaPConfig) {
+ return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger);
+ }
+
+ void sendEvent(JSONObject event, String domain);
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/PublisherConfig.java
new file mode 100755
index 0000000..33fa15b
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/PublisherConfig.java
@@ -0,0 +1,95 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+
+import java.util.Objects;
+
+public class PublisherConfig {
+ private final List<String> destinations;
+ private final String topic;
+ private String userName;
+ private String password;
+
+ PublisherConfig(List<String> destinations, String topic) {
+ this.destinations = destinations;
+ this.topic = topic;
+ }
+
+ PublisherConfig(List<String> destinations, String topic, String userName, String password) {
+ this.destinations = destinations;
+ this.topic = topic;
+ this.userName = userName;
+ this.password = password;
+ }
+
+ List<String> destinations() {
+ return destinations;
+ }
+
+ String topic() {
+ return topic;
+ }
+
+ Option<String> userName() {
+ return Option.of(userName);
+ }
+
+ Option<String> password() {
+ return Option.of(password);
+ }
+
+ boolean isSecured() {
+ return userName().isDefined() && password().isDefined();
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PublisherConfig that = (PublisherConfig) o;
+ return Objects.equals(destinations, that.destinations) &&
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(userName, that.userName) &&
+ Objects.equals(password, that.password);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(destinations, topic, userName, password);
+ }
+
+ @Override
+ public String toString() {
+ return "PublisherConfig{" +
+ "destinations=" + destinations +
+ ", topic='" + topic + '\'' +
+ ", userName='" + userName + '\'' +
+ ", password='" + password + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/VavrUtils.java
new file mode 100755
index 0000000..77a3052
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/VavrUtils.java
@@ -0,0 +1,48 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import io.vavr.API;
+import io.vavr.API.Match.Case;
+
+import static io.vavr.API.$;
+
+
+public class VavrUtils {
+ private VavrUtils() {
+ // utils aggregator
+ }
+
+ /**
+ * Shortcut for 'string interpolation'
+ */
+ static String f(String msg, Object... args) {
+ return String.format(msg, args);
+ }
+
+ /**
+ * Wrap failure with a more descriptive message of what has failed and chain original cause. Used to provide a
+ * context for errors instead of raw exception.
+ */
+ static Case<Throwable, Throwable> enhanceError(String msg) {
+ return API.Case($(), e -> new RuntimeException(msg, e));
+ }
+}
diff --git a/src/main/scripts/docker_entry.sh b/src/main/scripts/docker_entry.sh
new file mode 100755
index 0000000..78b83f6
--- /dev/null
+++ b/src/main/scripts/docker_entry.sh
@@ -0,0 +1,6 @@
+#!/bin/sh
+
+echo "INFO: USING RESTCONF CONTROLLER"
+
+/opt/app/restconfcollector/bin/restConfCollector.sh stop
+/opt/app/restconfcollector/bin/restConfCollector.sh start \ No newline at end of file
diff --git a/src/main/scripts/restConfCollector.sh b/src/main/scripts/restConfCollector.sh
new file mode 100755
index 0000000..88893c0
--- /dev/null
+++ b/src/main/scripts/restConfCollector.sh
@@ -0,0 +1,100 @@
+#!/bin/sh
+
+usage() {
+ echo "restConfCollector.sh <start/stop>"
+}
+
+BASEDIR=/opt/app/restconfcollector
+rm -rf /opt/app/restconfcollector/logs
+mkdir /opt/app/restconfcollector/logs
+cd /opt/app/restconfcollector/logs
+touch console.txt
+cd -
+
+restConfCollector_start() {
+ echo `date +"%Y%m%d.%H%M%S%3N"` - restConfCollector_start | tee -a ${BASEDIR}/logs/console.txt
+ collectorPid=`pgrep -f org.onap.restconf.common`
+
+ if [ ! -z "$collectorPid" ]; then
+ echo "WARNING: restConf Collector already running as PID $collectorPid" | tee -a ${BASEDIR}/logs/console.txt
+ echo "Startup Aborted!!!" | tee -a ${BASEDIR}/logs/console.txt
+ exit 1
+ fi
+
+
+ # run java. The classpath is the etc dir for config files, and the lib dir
+ # for all the jars.
+
+ cd ${BASEDIR}
+ echo "192.168.17.11 onap-message-router" >> /etc/hosts
+ nohup $JAVA -cp "etc${PATHSEP}lib/*" $JAVA_OPTS -Dhttps.protocols=TLSv1.1,TLSv1.2 $MAINCLASS $* &
+ if [ $? -ne 0 ]; then
+ echo "restConf Collector has been started!!!" | tee -a ${BASEDIR}/logs/console.txt
+ fi
+
+
+}
+
+## Pre-setting
+JAVA_HOME=/usr/bin/java
+
+# use JAVA_HOME if provided
+if [ -z "$JAVA_HOME" ]; then
+ echo "ERROR: JAVA_HOME not setup"
+ echo "Startup Aborted!!"
+ exit 1
+else
+ JAVA=$JAVA_HOME
+fi
+
+MAINCLASS=org.onap.dcae.collectors.restconf.common.RestConfCollector
+
+# determine a path separator that works for this platform
+PATHSEP=":"
+case "$(uname -s)" in
+
+ Darwin)
+ ;;
+
+ Linux)
+ ;;
+
+ CYGWIN*|MINGW32*|MSYS*)
+ PATHSEP=";"
+ ;;
+
+ *)
+ ;;
+esac
+
+restConfCollector_stop() {
+ echo `date +"%Y%m%d.%H%M%S%3N"` - collector_stop
+ collectorPid=`pgrep -f org.onap.dcae.collectors.restconf.common`
+ if [ ! -z "$collectorPid" ]; then
+ echo "Stopping PID $collectorPid"
+
+ kill -9 $collectorPid
+ sleep 5
+ if [ ! "$(pgrep -f org.onap.restconf.common)" ]; then
+ echo "restConf Collector has been stopped!!!"
+ else
+ echo "restConf Collector is being stopped!!!"
+ fi
+ else
+ echo "WARNING: No restConf Collector instance is currently running";
+ exit 1
+ fi
+
+}
+
+case $1 in
+ "start")
+ restConfCollector_start | tee -a ${BASEDIR}/logs/console.txt
+ ;;
+ "stop")
+ restConfCollector_stop | tee -a ${BASEDIR}/logs/console.txt
+ ;;
+ *)
+ usage
+ ;;
+esac
diff --git a/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParserTest.java b/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParserTest.java
new file mode 100755
index 0000000..1b709bd
--- /dev/null
+++ b/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParserTest.java
@@ -0,0 +1,111 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import io.vavr.collection.Map;
+import io.vavr.control.Try;
+import org.junit.Test;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static io.vavr.API.List;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.onap.dcae.collectors.restconf.common.event.publishing.DMaaPConfigurationParser.parseToDomainMapping;
+
+public class DMaaPConfigurationParserTest {
+ @Test
+ public void testParseCredentialsForGen2() {
+ Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsGen2.json");
+ Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path);
+
+ PublisherConfig authCredentialsNulls = publisherConfigs.get().get("auth-credentials-null").getOrNull();
+ assertThat(authCredentialsNulls.userName().isEmpty()).isTrue();
+ assertThat(authCredentialsNulls.password().isEmpty()).isTrue();
+ assertThat(authCredentialsNulls.isSecured()).isFalse();
+
+ PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull();
+ assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser");
+ assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword");
+ assertThat(authCredentialsPresent.isSecured()).isTrue();
+
+ PublisherConfig authCredentialsKeysMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull();
+ assertThat(authCredentialsKeysMissing.userName().isEmpty()).isTrue();
+ assertThat(authCredentialsKeysMissing.password().isEmpty()).isTrue();
+ assertThat(authCredentialsKeysMissing.isSecured()).isFalse();
+ }
+
+
+ @Test
+ public void testParseCredentialsForLegacy() {
+ Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsLegacy.json");
+ Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path);
+
+ PublisherConfig authCredentialsNull = publisherConfigs.get().get("auth-credentials-null").getOrNull();
+ assertThat(authCredentialsNull.userName().isEmpty()).isTrue();
+ assertThat(authCredentialsNull.password().isEmpty()).isTrue();
+ assertThat(authCredentialsNull.isSecured()).isFalse();
+
+ PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull();
+ assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser");
+ assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword");
+ assertThat(authCredentialsPresent.isSecured()).isTrue();
+
+ PublisherConfig authCredentialsMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull();
+ assertThat(authCredentialsMissing.userName().isEmpty()).isTrue();
+ assertThat(authCredentialsMissing.password().isEmpty()).isTrue();
+ assertThat(authCredentialsMissing.isSecured()).isFalse();
+ }
+
+
+ @Test
+ public void testParseGen2() {
+ Path path = Paths.get("src/test/resources/testParseDMaaPGen2.json");
+ Try<Map<String, PublisherConfig>> publisherConfigs = parseToDomainMapping(path);
+
+ PublisherConfig withEventsSegment = publisherConfigs.get().get("event-segments-with-port").getOrNull();
+ assertThat(withEventsSegment.destinations()).isEqualTo(List("UEBHOST:3904"));
+ assertThat(withEventsSegment.topic()).isEqualTo("DCAE-RESTCONF-COLLECTOR-EVENTS-DEV");
+
+ PublisherConfig withOtherSegment = publisherConfigs.get().get("other-segments-without-ports").getOrNull();
+ assertThat(withOtherSegment.destinations()).isEqualTo(List("UEBHOST"));
+ assertThat(withOtherSegment.topic()).isEqualTo("DCAE-RESTCONF-COLLECTOR-EVENTS-DEV");
+ }
+
+ @Test
+ public void testParseLegacy() {
+ Path exemplaryConfig = Paths.get("src/test/resources/testParseDMaaPLegacy.json");
+ Try<Map<String, PublisherConfig>> publisherConfigs =
+ parseToDomainMapping(exemplaryConfig);
+
+ PublisherConfig urlFirstThenHosts = publisherConfigs.get().get("url-precedes-hosts").getOrNull();
+ assertThat(urlFirstThenHosts.destinations()).isEqualTo(List("127.0.0.1:3904"));
+ assertThat(urlFirstThenHosts.topic()).isEqualTo("DCAE-RESTCONF-COLLECTOR-EVENTS-DEV");
+
+ PublisherConfig urlKeyMissing = publisherConfigs.get().get("url-key-missing").getOrNull();
+ assertThat(urlKeyMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com"));
+ assertThat(urlKeyMissing.topic()).isEqualTo("DCAE-RESTCONF-COLLECTOR-EVENTS-DEV");
+
+ PublisherConfig urlIsMissing = publisherConfigs.get().get("url-is-null").getOrNull();
+ assertThat(urlIsMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com"));
+ assertThat(urlIsMissing.topic()).isEqualTo("DCAE-RESTCONF-COLLECTOR-EVENTS-DEV");
+ }
+}
diff --git a/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisherTest.java
new file mode 100755
index 0000000..f66748c
--- /dev/null
+++ b/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisherTest.java
@@ -0,0 +1,77 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import org.json.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+
+import static io.vavr.API.Option;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class DMaaPEventPublisherTest {
+
+ private static final String STREAM_ID = "sampleStreamId";
+
+ private DMaaPEventPublisher eventPublisher;
+ private CambriaBatchingPublisher cambriaPublisher;
+ private DMaaPPublishersCache DMaaPPublishersCache;
+
+ @Before
+ public void setUp() {
+ cambriaPublisher = mock(CambriaBatchingPublisher.class);
+ DMaaPPublishersCache = mock(DMaaPPublishersCache.class);
+ when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher));
+ eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache, mock(Logger.class));
+ }
+
+ @Test
+ public void shouldSendEventToTopic() throws Exception {
+ // given
+ JSONObject event = new JSONObject("{}");
+
+ // when
+ eventPublisher.sendEvent(event, STREAM_ID);
+
+ // then
+ verify(cambriaPublisher).send("MyPartitionKey", event.toString());
+ }
+
+ @Test
+ public void shouldCloseConnectionWhenExceptionOccurred() throws Exception {
+ // given
+ JSONObject event = new JSONObject("{}");
+ given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail"));
+
+ // when
+ eventPublisher.sendEvent(event, STREAM_ID);
+
+ // then
+ verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID);
+ }
+}
diff --git a/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCacheTest.java b/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCacheTest.java
new file mode 100755
index 0000000..49f37c3
--- /dev/null
+++ b/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCacheTest.java
@@ -0,0 +1,92 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import org.junit.Before;
+import org.junit.Test;
+import org.onap.dcae.collectors.restconf.common.event.publishing.DMaaPPublishersCache.OnPublisherRemovalListener;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import static io.vavr.API.List;
+import static io.vavr.API.Map;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class DMaaPPublishersCacheTest {
+
+ private String streamId1;
+ private Map<String, PublisherConfig> dMaaPConfigs;
+
+ @Before
+ public void setUp() {
+ streamId1 = "sampleStream1";
+ dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1"));
+ }
+
+ @Test
+ public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() {
+ // given
+ DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
+
+ // when
+ Option<CambriaBatchingPublisher> firstPublisher = dMaaPPublishersCache.getPublisher(streamId1);
+ Option<CambriaBatchingPublisher> secondPublisher = dMaaPPublishersCache.getPublisher(streamId1);
+
+ // then
+ assertSame("should return same instance", firstPublisher.get(), secondPublisher.get());
+ }
+
+ @Test
+ public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException {
+ // given
+ CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class);
+ DMaaPPublishersCache.CambriaPublishersCacheLoader cacheLoaderMock = mock(DMaaPPublishersCache.CambriaPublishersCacheLoader.class);
+ DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock,
+ new OnPublisherRemovalListener(),
+ dMaaPConfigs);
+ when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1);
+
+ // when
+ dMaaPPublishersCache.getPublisher(streamId1);
+ dMaaPPublishersCache.closePublisherFor(streamId1);
+
+ // then
+ verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS);
+
+ }
+
+ @Test
+ public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() {
+ // given
+ DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs);
+
+ // then
+ assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty());
+ }
+
+} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/collectors/restconf/restconftest/AnyNodeTest.java b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/AnyNodeTest.java
new file mode 100755
index 0000000..746023d
--- /dev/null
+++ b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/AnyNodeTest.java
@@ -0,0 +1,62 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.restconftest;
+
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.dcae.collectors.restconf.common.AnyNode;
+
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class AnyNodeTest {
+
+ private static final String SAMPLE_JSON_FILEPATH = "{\n"
+ + " \"channels\": [{\n"
+ + " \"one\": \"number1\", \"two\": \"number2\", \"three\": \"number3\"}],\n"
+ + " \"sampleStrList\": [\"1\", \"2\", \"3\", \"4\", \"5\"],\n"
+ + " \"sampleNestedObject\": {\"a\": 1, \"b\": 2},\n"
+ + " \"sampleInt\": 1,\n"
+ + " \"sampleString\": \"str\",\n"
+ + " \"sampleNull\": null\n"
+ + "}\n";
+ private static final Set<String> EXPECTED_JSON_KEYS = Sets
+ .newHashSet("channels", "sampleStrList", "sampleNestedObject", "sampleInt", "sampleString", "sampleNull");
+ private static AnyNode node;
+
+
+ @BeforeClass
+ public static void setUpClass() {
+ node = AnyNode.fromString(SAMPLE_JSON_FILEPATH);
+ }
+
+ @Test
+ public void testShouldReturnJsonObjectKeySet() {
+ assertThat(node.keys()).containsOnlyElementsOf(EXPECTED_JSON_KEYS);
+ }
+
+ @Test(expected = ClassCastException.class)
+ public void whenInvokedOnJsonObjInsteadOfJsonArrShouldRaiseRuntimeEx() {
+ node.toList();
+ }
+} \ No newline at end of file
diff --git a/src/test/java/org/onap/dcae/collectors/restconf/restconftest/RestConfProcTest.java b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/RestConfProcTest.java
new file mode 100755
index 0000000..0084f40
--- /dev/null
+++ b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/RestConfProcTest.java
@@ -0,0 +1,75 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.restconftest;
+
+import com.att.nsa.cmdLine.NsaCommandLineUtil;
+import com.att.nsa.drumlin.service.framework.DrumlinServlet;
+import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
+import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory;
+import org.glassfish.jersey.media.sse.SseFeature;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.junit.Test;
+import org.onap.dcae.collectors.restconf.common.Constants;
+import org.onap.dcae.collectors.restconf.common.RestConfCollector;
+import org.onap.dcae.collectors.restconf.common.RestConfContext;
+import org.onap.dcae.collectors.restconf.common.RestConfProc;
+
+import java.net.URI;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RestConfProcTest {
+
+ private static final URI CONTEXT = URI.create("http://localhost:8080/");
+
+ @Test
+ public void testEstablishPersistentConnection() throws Exception {
+
+ final Map<String, String> argMap = new HashMap<>();
+ final String config = NsaCommandLineUtil.getSetting(argMap, Constants.KCONFIG, "collector.properties");
+ final URL settingStream = DrumlinServlet.findStream(config, RestConfCollector.class);
+
+ final nvReadableStack settings = new nvReadableStack();
+
+ settings.push(new nvPropertiesFile(settingStream));
+ settings.push(new nvReadableTable(argMap));
+
+ RestConfProc restConfProc = new RestConfProc(settings);
+
+ final ResourceConfig resourceConfig = new ResourceConfig(SseResource.class, SseFeature.class);
+ GrizzlyHttpServerFactory.createHttpServer(CONTEXT, resourceConfig);
+ RestConfContext ctx = new RestConfContext();
+ ctx.setAttribute("prop.encoding-json", "encoding-json");
+ ctx.setAttribute("restapi-result.response-code", "200");
+ ctx.setAttribute("restapi-result.ietf-subscribed-notifications:output.identifier", "100");
+
+ Map<String, String> p = new HashMap<>();
+ p.put("sseConnectURL", "http://localhost:8080/ssevents");
+ p.put("subscriberId", "networkId");
+ p.put("responsePrefix", "restapi-result");
+
+ restConfProc.establishPersistentConnection(p, ctx);
+ Thread.sleep(1000);
+ }
+}
diff --git a/src/test/java/org/onap/dcae/collectors/restconf/restconftest/SseResource.java b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/SseResource.java
new file mode 100755
index 0000000..db81886
--- /dev/null
+++ b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/SseResource.java
@@ -0,0 +1,69 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.restconftest;
+
+import org.glassfish.jersey.media.sse.EventOutput;
+import org.glassfish.jersey.media.sse.OutboundEvent;
+import org.glassfish.jersey.media.sse.SseFeature;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import java.io.IOException;
+
+@Path("ssevents")
+public class SseResource {
+
+ @GET
+ @Produces(SseFeature.SERVER_SENT_EVENTS)
+ public EventOutput getServerSentEvents() throws IOException {
+ String data = "{"
+ + "\"ietf-notification:notification\" : {"
+ + " \"eventTime\" : \"2017-10-25T08:22:33.44Z\","
+ + " \"ietf-yang-push:push-change-update\": {"
+ + "\"subscription-id\":\"89\","
+ + "\"datastore-changes\": {"
+ + "\"ietf-yang-patch:yang-patch\":{"
+ + "\"patch-id\":\"1\","
+ + "\"edit\":[{"
+ + "\"edit-id\":\"edit1\","
+ + "\"operation\":\"merge\","
+ + "\"target\":\"/ietf-interfaces:interfaces-state\","
+ + "\"value\": {"
+ + "\"ietf-interfaces:interfaces-state\":{"
+ + "\"interface\": {"
+ + "\"name\":\"eth0\","
+ + "\"oper-status\":\"down\","
+ + "}"
+ + "}"
+ + "}"
+ + "}]"
+ + "}"
+ + "}"
+ + "}"
+ + "}"
+ + "}";
+ final EventOutput result = new EventOutput();
+ result.write(new OutboundEvent.Builder().data(String.class, data).build());
+ result.close();
+ return result;
+ }
+}
diff --git a/src/test/java/org/onap/dcae/collectors/restconf/restconftest/TestRestConfCollector.java b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/TestRestConfCollector.java
new file mode 100755
index 0000000..6bedc8e
--- /dev/null
+++ b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/TestRestConfCollector.java
@@ -0,0 +1,66 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.restconftest;
+
+import com.att.nsa.cmdLine.NsaCommandLineUtil;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.junit.Test;
+import org.onap.dcae.collectors.restconf.common.Constants;
+import org.onap.dcae.collectors.restconf.common.RestConfProc;
+
+import java.io.FileReader;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestRestConfCollector {
+ @Test
+ public void testParseCLIArguments() {
+ // given
+ String args[] = {"-a", "aa"};
+ Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
+ // when
+ nvReadableStack settings = new nvReadableStack();
+ settings.push(new nvReadableTable(argMap));
+
+ // then
+ assertEquals(settings.getString("a", "default"), "aa");
+ }
+
+ @Test
+ public void shouldPutValidRestConfEventOnProcessingQueueWithoutExceptions() throws Exception {
+ // given
+ RestConfProc.fProcessingInputQueue = new LinkedBlockingQueue<>(
+ Constants.KDEFAULT_MAXQUEUEDEVENTS);
+ JsonElement restConfEvent = new JsonParser().parse(new FileReader("src/test/resources/RestConfEvent.json"));
+ JSONObject validRestConfEvent = new JSONObject(restConfEvent.toString());
+ JSONArray jsonArrayMod = new JSONArray().put(validRestConfEvent);
+
+ // then
+ RestConfProc.handleEvents(jsonArrayMod);
+ }
+}
diff --git a/src/test/resources/RestConfEvent.json b/src/test/resources/RestConfEvent.json
new file mode 100755
index 0000000..93d0b64
--- /dev/null
+++ b/src/test/resources/RestConfEvent.json
@@ -0,0 +1,26 @@
+{
+ "ietf-notification:notification" : {
+ "eventTime" : "eventtime",
+ "ietf-yang-push:push-change-update": {
+ "subscription-id":"100",
+ "datastore-changes": {
+ "ietf-yang-patch:yang-patch":{
+ "patch-id":"patch-id",
+ "edit":[{
+ "edit-id":"edit-id",
+ "operation":"create",
+ "target":"target",
+ "value": {
+ "ietf-interfaces:interfaces-state":{
+ "interface": {
+ "name":"eth0",
+ "oper-status":"up"
+ }
+ }
+ }
+ }]
+ }
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/src/test/resources/testParseDMaaPCredentialsGen2.json b/src/test/resources/testParseDMaaPCredentialsGen2.json
new file mode 100755
index 0000000..23230c1
--- /dev/null
+++ b/src/test/resources/testParseDMaaPCredentialsGen2.json
@@ -0,0 +1,21 @@
+{
+ "auth-credentials-null": {
+ "aaf_username": null,
+ "dmaap_info": {
+ "topic_url": "http://UEBHOST:3904/events/DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"
+ },
+ "aaf_password": null
+ },
+ "auth-credentials-present": {
+ "aaf_username": "sampleUser",
+ "dmaap_info": {
+ "topic_url": "http://UEBHOST:3904/events/DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"
+ },
+ "aaf_password": "samplePassword"
+ },
+ "auth-credentials-missing": {
+ "dmaap_info": {
+ "topic_url": "http://UEBHOST:3904/events/DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"
+ }
+ }
+} \ No newline at end of file
diff --git a/src/test/resources/testParseDMaaPCredentialsLegacy.json b/src/test/resources/testParseDMaaPCredentialsLegacy.json
new file mode 100755
index 0000000..ee215e2
--- /dev/null
+++ b/src/test/resources/testParseDMaaPCredentialsLegacy.json
@@ -0,0 +1,26 @@
+{
+ "channels": [
+ {
+ "name": "auth-credentials-null",
+ "cambria.url": "127.0.0.1:3904",
+ "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
+ "cambria.topic": "DCAE-RESTCONF-COLLECTOR-EVENTS-DEV",
+ "basicAuthPassword": null,
+ "basicAuthUsername": null
+ },
+ {
+ "name": "auth-credentials-present",
+ "cambria.url": "127.0.0.1:3904",
+ "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
+ "cambria.topic": "DCAE-RESTCONF-COLLECTOR-EVENTS-DEV",
+ "basicAuthPassword": "samplePassword",
+ "basicAuthUsername": "sampleUser"
+ },
+ {
+ "name": "auth-credentials-missing",
+ "cambria.url": "127.0.0.1:3904",
+ "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com",
+ "cambria.topic": "DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"
+ }
+ ]
+} \ No newline at end of file
diff --git a/src/test/resources/testParseDMaaPGen2.json b/src/test/resources/testParseDMaaPGen2.json
new file mode 100755
index 0000000..4ef04a2
--- /dev/null
+++ b/src/test/resources/testParseDMaaPGen2.json
@@ -0,0 +1,12 @@
+{
+ "event-segments-with-port": {
+ "dmaap_info": {
+ "topic_url": "http://UEBHOST:3904/events/DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"
+ }
+ },
+ "other-segments-without-ports": {
+ "dmaap_info": {
+ "topic_url": "http://UEBHOST:3904/somethingHere/DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"
+ }
+ }
+} \ No newline at end of file
diff --git a/src/test/resources/testParseDMaaPLegacy.json b/src/test/resources/testParseDMaaPLegacy.json
new file mode 100755
index 0000000..fda9c60
--- /dev/null
+++ b/src/test/resources/testParseDMaaPLegacy.json
@@ -0,0 +1,21 @@
+{
+ "channels": [
+ {
+ "name": "url-precedes-hosts",
+ "cambria.url": "127.0.0.1:3904",
+ "cambria.hosts": "h1.att.com,h2.att.com",
+ "cambria.topic": "DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"
+ },
+ {
+ "name": "url-key-missing",
+ "cambria.hosts": "h1.att.com,h2.att.com",
+ "cambria.topic": "DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"
+ },
+ {
+ "name": "url-is-null",
+ "cambria.url": null,
+ "cambria.hosts": "h1.att.com,h2.att.com",
+ "cambria.topic": "DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"
+ }
+ ]
+} \ No newline at end of file