summaryrefslogtreecommitdiffstats
path: root/utils/DmaapPublisher
diff options
context:
space:
mode:
authorMichael Lando <ml636r@att.com>2018-03-04 14:53:33 +0200
committerMichael Lando <ml636r@att.com>2018-03-07 13:19:05 +0000
commita5445100050e49e83f73424198d73cd72d672a4d (patch)
treecacf4df817df31be23e4e790d1dda857bdae061e /utils/DmaapPublisher
parent51157f92c21976cba4914c378aaa3cba49826931 (diff)
Sync Integ to Master
Change-Id: I71e3acc26fa612127756ac04073a522b9cc6cd74 Issue-ID: SDC-977 Signed-off-by: Gitelman, Tal (tg851x) <tg851x@intl.att.com>
Diffstat (limited to 'utils/DmaapPublisher')
-rw-r--r--utils/DmaapPublisher/pom.xml131
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java59
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java97
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java149
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java19
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java52
-rw-r--r--utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java19
-rw-r--r--utils/DmaapPublisher/src/main/resources/catalogMgmt.properties35
-rw-r--r--utils/DmaapPublisher/src/main/resources/catalogMgmt.yaml7
-rw-r--r--utils/DmaapPublisher/src/main/resources/catalogWfMgmt.properties32
-rw-r--r--utils/DmaapPublisher/src/main/resources/catalogWfMgmt.yaml6
-rw-r--r--utils/DmaapPublisher/src/main/resources/preferredRouter.txt0
-rw-r--r--utils/DmaapPublisher/src/test/java/org/openecomp/sdc/dmaap/DmaapPublisherTest.java22
-rw-r--r--utils/DmaapPublisher/src/test/resources/catalogMgmtTest.properties34
-rw-r--r--utils/DmaapPublisher/src/test/resources/catalogMgmtTest.yaml7
-rw-r--r--utils/DmaapPublisher/src/test/resources/preferredRouter.txt0
16 files changed, 669 insertions, 0 deletions
diff --git a/utils/DmaapPublisher/pom.xml b/utils/DmaapPublisher/pom.xml
new file mode 100644
index 0000000000..6152fdff83
--- /dev/null
+++ b/utils/DmaapPublisher/pom.xml
@@ -0,0 +1,131 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.openecomp.sdc</groupId>
+ <artifactId>dmaap-publisher</artifactId>
+ <version>1.0.0</version>
+
+
+ <properties>
+ <fasterxml.jackson.version>2.8.6</fasterxml.jackson.version>
+ </properties>
+
+ <dependencies>
+ <!--spock testing-->
+ <dependency>
+ <groupId>org.spockframework</groupId>
+ <artifactId>spock-core</artifactId>
+ <version>1.1-groovy-2.4</version>
+ <scope>test</scope>
+ </dependency>
+ <!--groovy-->
+ <dependency>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy</artifactId>
+ <version>2.4.11</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.7</version>
+ </dependency>
+ <dependency>
+ <groupId>com.att.nsa</groupId>
+ <artifactId>dmaapClient</artifactId>
+ <version>0.2.16</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>args4j</groupId>
+ <artifactId>args4j</artifactId>
+ <version>2.33</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>22.0</version>
+ </dependency>
+ <!-- https://mvnrepository.com/artifact/org.yaml/snakeyaml -->
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>1.18</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.8.47</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>3.8.0</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.openecomp.sdc.dmaap.DmaapPublisher</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+</project> \ No newline at end of file
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java
new file mode 100644
index 0000000000..38e53c824f
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/CliArgs.java
@@ -0,0 +1,59 @@
+package org.openecomp.sdc.dmaap;
+
+import org.kohsuke.args4j.Option;
+import com.google.common.base.MoreObjects;
+
+public class CliArgs {
+
+ @Option(name="yml",aliases = {"-YML","YML","-yml","-YAML","YAML","-yaml"}, usage="mandatory arg. YAML filename", required=true)
+ private String yamlFilename;
+
+ @Option(name="path",aliases = {"-path","PATH","-PATH"}, usage="mandatory arg. path to the yaml file which contains topic config (publisher data + messages)", required=true)
+ private String yamlPath;
+
+ @Option(name="cr",aliases = {"CR","-cr","-CR"}, usage="optional arg. concurrent requests", required=false)
+ private String concurrentRequests;
+
+ @Option(name="notification",aliases = {"NOTIFICATION","-NOTIFICATION","-notification"}, usage="optional load dynamic messages", required=false)
+ private String notificationData;
+
+ public String getYamlPath() {
+ return yamlPath;
+ }
+
+ public String getYamlFilename() {
+ return yamlFilename;
+ }
+
+ public void setYamlPath(String yamlPath) {
+ this.yamlPath = yamlPath;
+ }
+
+
+ public String getConcurrentRequests() {
+ return concurrentRequests;
+ }
+
+ public void setConcurrentRequests(String concurrentRequests) {
+ this.concurrentRequests = concurrentRequests;
+ }
+
+ public String getNotificationData() {
+ return notificationData;
+ }
+
+
+ public void setYamlFilename(String yamlFilename) {
+ this.yamlFilename = yamlFilename;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("yamlPath", yamlPath)
+ .add("concurrentRequests", concurrentRequests)
+ .toString();
+ }
+
+
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java
new file mode 100644
index 0000000000..61e48fa50e
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublishTool.java
@@ -0,0 +1,97 @@
+package org.openecomp.sdc.dmaap;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRPublisher.message;
+
+public class DmaapPublishTool {
+
+ private static final Logger logger = LoggerFactory.getLogger(DmaapPublishTool.class);
+ final private TopicConfig topicConfig;
+
+ public DmaapPublishTool(String yamlPath) throws FileNotFoundException {
+ topicConfig = loadTopicConfig(yamlPath);
+ System.out.println("yaml file loaded.");
+ }
+ public DmaapPublishTool(String yamlPath , String notifications) throws FileNotFoundException {
+ topicConfig = loadTopicConfig(yamlPath);
+ if (StringUtils.isNotBlank(notifications) )
+ topicConfig.add( notifications );
+ System.out.println("yaml file loaded.");
+ }
+
+ public void addNotifications(Collection<String> notification){
+ topicConfig.addAll( notification );
+ }
+
+ //safe stream doesn't throw null pointer exception
+ public <T> Collection<T> safe(Collection<T> obj){
+ return Optional.ofNullable(obj).orElse(Collections.emptySet());
+ }
+ public <T> List<T> safe(List<T> obj){
+ return Optional.ofNullable(obj).orElse(Collections.emptyList());
+ }
+
+ public void publish(String path) throws IOException, InterruptedException {
+ MRBatchingPublisher pub = createPublisher( topicConfig, path );
+ System.out.println( "pending message count -> "+pub.getPendingMessageCount() );
+ List<String> list = this.topicConfig.getIncomingTopicMessages();
+ for(String msg : safe(list) ){
+ publishOne( pub , msg );
+ }
+ closePublisher(pub);
+ }
+
+ private MRBatchingPublisher createPublisher(TopicConfig topicConfig,String path) throws IOException {
+ MRBatchingPublisher publisher = MRClientFactory.createBatchingPublisher(Objects.requireNonNull(Util.toPath(path,topicConfig.getPublisherPropertiesFilePath())));
+ System.out.println("publisher created.");
+ return publisher;
+ }
+
+ private TopicConfig loadTopicConfig(String yamlPath) throws FileNotFoundException {
+ File yamlFile = new File(Objects.requireNonNull(yamlPath));
+ InputStream input = new FileInputStream(yamlFile);
+ Yaml yamlHelper = new Yaml();
+ return yamlHelper.loadAs(input, TopicConfig.class);
+ }
+
+ private void publishOne(MRBatchingPublisher pub, String msg) throws IOException, InterruptedException {
+ System.out.println("sending: " + msg);
+ pub.send(msg);
+ System.out.println("message sent.");
+ }
+
+ private void closePublisher(MRBatchingPublisher pub) throws IOException, InterruptedException {
+ System.out.println("closing publisher...");
+ // close the publisher to make sure everything's sent before exiting. The batching
+ // publisher interface allows the app to get the set of unsent messages. It could
+ // write them to disk, for example, to try to send them later.
+ final List<message> stuck = pub.close(20, TimeUnit.SECONDS);
+ if(!stuck.isEmpty())
+ {
+ final String errMsg = stuck.size() + " messages unsent";
+ logger.error(errMsg);
+ System.err.println(errMsg);
+ }
+ else
+ {
+ final String successMsg = "Clean exit; all messages sent.";
+ logger.info(successMsg);
+ System.out.println(successMsg);
+ }
+ }
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java
new file mode 100644
index 0000000000..fd558356ed
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/DmaapPublisher.java
@@ -0,0 +1,149 @@
+package org.openecomp.sdc.dmaap;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.kohsuke.args4j.CmdLineException;
+import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.OptionHandlerFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+
+
+import static org.openecomp.sdc.dmaap.Util.*;
+
+public class DmaapPublisher {
+ private static final Logger logger = LoggerFactory.getLogger(DmaapPublisher.class);
+ private static RequestManager requestManager ;
+ private static final ConcurrentLinkedDeque notificationBuffer = new ConcurrentLinkedDeque();
+
+
+ private static final List<Long> registeredTasks = new CopyOnWriteArrayList<>();
+ private DmaapPublisher() {}
+
+ public static void add(String notification){
+ notificationBuffer.add( notification );
+ }
+ public static void addAll(List<String> notifications){
+ notificationBuffer.addAll( notifications );
+ }
+ public static void main(String[] args) {
+ doPublish(args);
+ }
+
+ private static void doPublish( String[] args ) {
+ CliArgs cliArgs = new CliArgs();
+ CmdLineParser parser = new CmdLineParser(cliArgs);
+
+ try {
+ // parse the arguments.
+ parser.parseArgument( args );
+ doPublish( cliArgs );
+ }
+ catch(CmdLineException e) {
+ logger.error("#doPublish - failed to parse arguments.", e);
+ printUsage(parser, e);
+ return;
+ }
+ }
+
+ public static void doPublish( CliArgs cliArgs ){
+ try {
+ // parse the arguments.
+ DmaapPublishTool tool = new DmaapPublishTool( toPath(cliArgs.getYamlPath() , cliArgs.getYamlFilename()) , cliArgs.getNotificationData() );
+ Collection<String> notifications = new ArrayList<String>( notificationBuffer );
+ tool.addNotifications( notifications );
+ notificationBuffer.removeAll(notifications);
+ Integer concurrentRequestCount = 1;
+ if ( StringUtils.isNotBlank( cliArgs.getConcurrentRequests() ) )
+ concurrentRequestCount = Integer.parseInt( cliArgs.getConcurrentRequests() );
+ requestManager = new RequestManager( concurrentRequestCount );
+
+ IntStream.range(0,concurrentRequestCount).forEach( it -> {
+ //region - report upon finish mechanishem
+ long ticket = System.nanoTime();
+ registeredTasks.add( ticket );
+ Consumer callback = ( uniqueTicket ) -> {
+ synchronized ( registeredTasks ){
+ registeredTasks.remove( (long)uniqueTicket );
+ registeredTasks.notifyAll();
+ }};
+
+ RunnableReporter task = new RunnableReporter( ticket , tool , cliArgs , callback );
+ requestManager.getExecutor().execute( task ) ;
+ });
+ }
+ catch(NumberFormatException e) {
+ logger.error("#doPublish - failed to parse argument CR.", e);
+ return;
+ }
+ catch(Exception e) {
+ logger.error("#doPublish - failed to publish.", e);
+ }
+ }
+
+ public static class RunnableReporter implements Runnable{
+
+ final private long ticket ;
+ final private DmaapPublishTool tool;
+ final private CliArgs cliArgs;
+ final Consumer reporter;
+
+ public RunnableReporter(final long ticket , final DmaapPublishTool tool , final CliArgs args , Consumer reporter){
+ this.ticket = ticket ;
+ this.tool = tool ;
+ this.cliArgs = args ;
+ this.reporter = reporter;
+ }
+ @Override
+ public void run() {
+ try {
+ tool.publish( cliArgs.getYamlPath() );
+ reporter.accept(ticket);
+ }catch(IOException e){
+ logger.error("#doPublish - failed to publish.", e);
+ }catch(InterruptedException e){
+ logger.error("#doPublish - cannot complete publish, thread interuppted.", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+
+ public static List<Long> getRegisteredTasks() {
+ return registeredTasks;
+ }
+
+ public static void preparePublish( String path, String filename , String concurrentRequests ){
+
+ CliArgs cliArgs = new CliArgs();
+ if ( StringUtils.isNotBlank( filename ) )
+ cliArgs.setYamlFilename( filename );
+ if ( StringUtils.isNotBlank( path ) )
+ cliArgs.setYamlPath( path );
+ if ( NumberUtils.isCreatable( concurrentRequests ) )
+ cliArgs.setConcurrentRequests( concurrentRequests );
+
+ doPublish( cliArgs );
+
+ }
+
+
+ private static void printUsage(CmdLineParser parser, CmdLineException e) {
+ System.err.println( e.getMessage() );
+ System.err.println("java DmaapPublisher [options...] arguments...");
+ // print the list of available options
+ parser.printUsage(System.err);
+ System.err.println();
+ // print option sample. This is useful some time
+ System.err.println(" Example: java DmaapPublisher " + parser.printExample(OptionHandlerFilter.ALL));
+
+ }
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java
new file mode 100644
index 0000000000..597baac2bd
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/RequestManager.java
@@ -0,0 +1,19 @@
+package org.openecomp.sdc.dmaap;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public class RequestManager {
+
+ private Executor executor;
+
+ public RequestManager(int poolSize ){
+ int sz = Math.max( poolSize , 1);
+ int recommendedMaxSz = Runtime.getRuntime().availableProcessors() * 2;
+ executor = Executors.newFixedThreadPool( Math.min( sz , recommendedMaxSz ) );
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java
new file mode 100644
index 0000000000..a5b43ad0d5
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/TopicConfig.java
@@ -0,0 +1,52 @@
+package org.openecomp.sdc.dmaap;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+public class TopicConfig {
+
+ private String publisherPropertiesFilePath;
+ private String[] topicMessages; //messages from file
+ private final List<String> incomingTopicMessages = Collections.synchronizedList( new ArrayList<String>() ); //incoming messages from network stream|Main
+
+ public String getPublisherPropertiesFilePath() {
+ return publisherPropertiesFilePath;
+ }
+ public void setPublisherPropertiesFilePath(String publisherPropertiesFilePath) {
+ this.publisherPropertiesFilePath = publisherPropertiesFilePath;
+ }
+
+ public List<String> getIncomingTopicMessages() {
+ return incomingTopicMessages;
+ }
+ public String[] getTopicMessages() {
+ return topicMessages;
+ }
+ //add incoming message
+ public TopicConfig add( String notifications ){
+ incomingTopicMessages.add( notifications);
+ return this;
+ }
+
+ public TopicConfig addAll( Collection<String> notifications ){
+ incomingTopicMessages.addAll( notifications );
+ return this;
+ }
+
+ public void setTopicMessages(String[] topicMessages) {
+ this.topicMessages = topicMessages;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("publisherPropertiesFilePath", publisherPropertiesFilePath)
+ .add("topicMessages", topicMessages)
+ .toString();
+ }
+
+}
diff --git a/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java
new file mode 100644
index 0000000000..491b07abdd
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/java/org/openecomp/sdc/dmaap/Util.java
@@ -0,0 +1,19 @@
+package org.openecomp.sdc.dmaap;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.nio.file.InvalidPathException;
+
+public class Util {
+
+ public static String toPath(String path , String filename) throws InvalidPathException{
+ if (StringUtils.isNotBlank(path) ){
+ if (path.trim().endsWith("/") || path.trim().endsWith("/")){
+ return path+(filename!=null ? filename : "");
+ }
+ return path+"/"+(filename!=null ? filename : "");
+
+ }
+ throw new InvalidPathException("wrong path configuration cannot find path -> ",path);
+ }
+}
diff --git a/utils/DmaapPublisher/src/main/resources/catalogMgmt.properties b/utils/DmaapPublisher/src/main/resources/catalogMgmt.properties
new file mode 100644
index 0000000000..ff739f1e3f
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/resources/catalogMgmt.properties
@@ -0,0 +1,35 @@
+TransportType=DME2
+Latitude =32.109333
+Longitude =34.855499
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.att.com/events
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =https
+MethodType =POST
+username = m09875@sdc.att.com
+password =Aa123456
+contenttype = application/json
+Authorization = Basic bTEzMzMxQGNjZC5hdHQuY29tOkFhMTIzNDU2
+authKey=
+authDate=
+#Dmaap Server Url port 3904-HTTP 3905-https
+host=olsd004.wnsnet.attws.com:3905
+###topic=com.att.ccd.CCD-CatalogManagement-go539p or com.att.sdc.SDCforTestDev | com.att.sdc.23911-SDCforTestDev-v001
+#com.att.sdc.23911-scdc001dev001test-v1
+topic=com.att.sdc.23911-SDCforTestDev-v001
+partition=1
+maxBatchSize=100
+maxAgeMs=250
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=src/main/resources/preferredRouter.txt
+MessageSentThreadOccurance=50 \ No newline at end of file
diff --git a/utils/DmaapPublisher/src/main/resources/catalogMgmt.yaml b/utils/DmaapPublisher/src/main/resources/catalogMgmt.yaml
new file mode 100644
index 0000000000..04ac9fb992
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/resources/catalogMgmt.yaml
@@ -0,0 +1,7 @@
+publisherPropertiesFilePath: "catalogMgmt.properties"
+topicMessages:
+ - "{\"operationalEnvironmentId\": \"1234\",\"operationalEnvironmentName\":\"Op Env Name\",\"operationalEnvironmentType\":\"ECOMP\",\"tenantContext\":\"Test\",\"workloadContext\":\"VNF_E2E-IST\",\"action\":\"CREATE\"}"
+ #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"product\"],\"republish\":\"No\"}"
+ #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"logo\"],\"republish\":\"No\"}"
+ #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"availabilitymatrix\"],\"republish\":\"No\"}"
+ #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"product\",\"availabilitymatrix\"],\"republish\":\"No\"}" \ No newline at end of file
diff --git a/utils/DmaapPublisher/src/main/resources/catalogWfMgmt.properties b/utils/DmaapPublisher/src/main/resources/catalogWfMgmt.properties
new file mode 100644
index 0000000000..119c94ef3b
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/resources/catalogWfMgmt.properties
@@ -0,0 +1,32 @@
+TransportType=DME2
+Latitude =32.109333
+Longitude =34.855499
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.att.com/events
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =https
+MethodType =POST
+username =m13331@ccd.att.com
+password =Aa123456
+contenttype = application/json
+authKey=
+authDate=
+host=olsd004.wnsnet.attws.com:3904
+###topic=com.att.ccd.CCD-CatalogWorkflowManagement-go539p-v1
+topic=com.att.ccd.CCD-CatalogWorkflowManagement-v1
+partition=1
+maxBatchSize=100
+maxAgeMs=250
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=resources/preferredRouter.txt
+MessageSentThreadOccurance=50 \ No newline at end of file
diff --git a/utils/DmaapPublisher/src/main/resources/catalogWfMgmt.yaml b/utils/DmaapPublisher/src/main/resources/catalogWfMgmt.yaml
new file mode 100644
index 0000000000..da2ebd635a
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/resources/catalogWfMgmt.yaml
@@ -0,0 +1,6 @@
+publisherPropertiesFilePath: "resources/catalogWfMgmt.properties"
+topicMessages:
+ - "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"action\":\"Activate\"}"
+ #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"action\":\"Rollback\"}"
+ #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"action\":\"Activate\"}"
+ #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"action\":\"Activate\"}" \ No newline at end of file
diff --git a/utils/DmaapPublisher/src/main/resources/preferredRouter.txt b/utils/DmaapPublisher/src/main/resources/preferredRouter.txt
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/utils/DmaapPublisher/src/main/resources/preferredRouter.txt
diff --git a/utils/DmaapPublisher/src/test/java/org/openecomp/sdc/dmaap/DmaapPublisherTest.java b/utils/DmaapPublisher/src/test/java/org/openecomp/sdc/dmaap/DmaapPublisherTest.java
new file mode 100644
index 0000000000..19dbdeaed4
--- /dev/null
+++ b/utils/DmaapPublisher/src/test/java/org/openecomp/sdc/dmaap/DmaapPublisherTest.java
@@ -0,0 +1,22 @@
+package org.openecomp.sdc.dmaap;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.net.URL;
+import java.nio.file.Paths;
+
+import static org.junit.Assert.*;
+
+public class DmaapPublisherTest {
+ @Test
+ public void main() throws Exception {
+ File resource = new File("src/test/resources");
+ String absPath = resource.getAbsolutePath();
+
+ String msg = "{\"operationalEnvironmentId\":\"12345\",\"operationalEnvironmentName\":\"Op_Env_Name\",\"operationalEnvironmentType\":\"ECOMP\",\"tenantContext\":\"Test\",\"workloadContext\":\"VNF_E2E-IST\",\"action\":\"CREATE\"}";
+ String cmd = "-cr 5 "+ "-notification=" + msg+ " -path "+absPath+" -yaml catalogMgmtTest.yaml" ;
+ DmaapPublisher.main( cmd.split(" ") );
+ Thread.sleep(10000);
+ }
+} \ No newline at end of file
diff --git a/utils/DmaapPublisher/src/test/resources/catalogMgmtTest.properties b/utils/DmaapPublisher/src/test/resources/catalogMgmtTest.properties
new file mode 100644
index 0000000000..7f922214a9
--- /dev/null
+++ b/utils/DmaapPublisher/src/test/resources/catalogMgmtTest.properties
@@ -0,0 +1,34 @@
+TransportType=DME2
+Latitude =32.109333
+Longitude =34.855499
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.att.com/events
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =https
+MethodType =POST
+username = m09875@sdc.att.com
+password =Aa123456
+contenttype = application/json
+Authorization = Basic bTEzMzMxQGNjZC5hdHQuY29tOkFhMTIzNDU2
+authKey=
+authDate=
+#Dmaap Server Url port 3904-HTTP 3905-https
+host=olsd004.wnsnet.attws.com:3905
+###topic=com.att.ccd.CCD-CatalogManagement-go539p or com.att.sdc.SDCforTestDev | com.att.sdc.23911-SDCforTestDev-v001
+topic=com.att.sdc.23911-SDCforTestDev-v001
+partition=1
+maxBatchSize=100
+maxAgeMs=250
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=src/test/resources/preferredRouter.txt
+MessageSentThreadOccurance=50 \ No newline at end of file
diff --git a/utils/DmaapPublisher/src/test/resources/catalogMgmtTest.yaml b/utils/DmaapPublisher/src/test/resources/catalogMgmtTest.yaml
new file mode 100644
index 0000000000..f55641d859
--- /dev/null
+++ b/utils/DmaapPublisher/src/test/resources/catalogMgmtTest.yaml
@@ -0,0 +1,7 @@
+publisherPropertiesFilePath: "catalogMgmtTest.properties"
+topicMessages:
+
+ - "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"product\"],\"republish\":\"No\"}"
+ #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"logo\"],\"republish\":\"No\"}"
+ #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"availabilitymatrix\"],\"republish\":\"No\"}"
+ #- "{\"transactionId\": \"221e8cbe-493d-4848-b46c-a552b8928075\",\"notificationReason\":[\"product\",\"availabilitymatrix\"],\"republish\":\"No\"}" \ No newline at end of file
diff --git a/utils/DmaapPublisher/src/test/resources/preferredRouter.txt b/utils/DmaapPublisher/src/test/resources/preferredRouter.txt
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/utils/DmaapPublisher/src/test/resources/preferredRouter.txt