aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/mr/test/clients
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/test/clients')
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java95
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/ProtocolTypeConstants.java44
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java87
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java86
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumer.java84
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java95
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisher.java98
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisherWithResponse.java84
8 files changed, 673 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java b/src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java
new file mode 100644
index 0000000..62e4cda
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java
@@ -0,0 +1,95 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+package org.onap.dmaap.mr.test.clients;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRPublisher.message;
+
+/**
+ * A simple publisher that reads from std in, sending each line as a message.
+ * @author author
+ */
+public class ConsolePublisher
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(ConsolePublisher.class);
+ private ConsolePublisher() {
+ }
+ public static void main ( String[] args ) throws IOException //throws IOException, InterruptedException
+ {
+ // read the hosts(s) from the command line
+ final String hosts = args.length > 0 ? args[0] : "aaa.it.att.com,bbb.it.att.com,ccc.it.att.com";
+
+ // read the topic name from the command line
+ final String topic = args.length > 1 ? args[1] : "TEST-TOPIC";
+
+ // read the topic name from the command line
+ final String partition = args.length > 2 ? args[2] : UUID.randomUUID ().toString ();
+
+ // set up some batch limits and the compression flag
+ final int maxBatchSize = 100;
+ final long maxAgeMs = 250;
+ final boolean withGzip = false;
+
+ // create our publisher
+ final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, withGzip );
+
+ final BufferedReader cin = new BufferedReader ( new InputStreamReader ( System.in ) );
+ try
+ {
+ String line = null;
+ while ( ( line = cin.readLine () ) != null )
+ {
+ pub.send ( partition, line );
+ }
+ }
+ finally
+ {
+ List<message> leftovers = null;
+ try
+ {
+ leftovers = pub.close ( 10, TimeUnit.SECONDS );
+ }
+ catch ( InterruptedException e )
+ {
+ logger.error( "Send on close interrupted." );
+ Thread.currentThread().interrupt();
+ }
+ for ( message m : leftovers )
+ {
+ logger.error( "Unsent message: " + m.fMsg );
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/ProtocolTypeConstants.java b/src/main/java/org/onap/dmaap/mr/test/clients/ProtocolTypeConstants.java
new file mode 100644
index 0000000..6ece7f7
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/test/clients/ProtocolTypeConstants.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+/**
+ *
+ */
+package org.onap.dmaap.mr.test.clients;
+
+/**
+ * @author author
+ *
+ */
+public enum ProtocolTypeConstants {
+
+ DME2("DME2"), AAF_AUTH("HTTPAAF"), AUTH_KEY("HTTPAUTH"), HTTPNOAUTH("HTTPNOAUTH");
+
+ private String value;
+
+ private ProtocolTypeConstants(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java b/src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java
new file mode 100644
index 0000000..1f78be1
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java
@@ -0,0 +1,87 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.test.clients;
+
+import java.util.LinkedList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRConsumer;
+
+public class SampleConsumer {
+ private SampleConsumer() {
+ }
+ public static void main ( String[] args )
+ {
+ final Logger log = LoggerFactory.getLogger(SampleConsumer.class);
+
+
+ log.info("Sample Consumer Class executing");
+ final String topic = "com.att.app.dmaap.mr.testingTopic";
+ final String url = ( args.length > 1 ? args[1] : "localhost:8181" );
+ final String group = ( args.length > 2 ? args[2] :"grp" );
+
+ final String id = ( args.length > 3 ? args[3] : "1" );
+
+ long count = 0;
+ long nextReport = 5000;
+
+ final long startMs = System.currentTimeMillis ();
+
+ final LinkedList<String> urlList = new LinkedList<> ();
+ for ( String u : url.split ( "," ) )
+ {
+ urlList.add ( u );
+ }
+
+ final MRConsumer cc = MRClientFactory.createConsumer ( urlList, topic, group, id, 10*1000, 1000, null, "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" );
+ try
+ {
+ while ( true )
+ {
+ for ( String msg : cc.fetch () )
+ {
+ log.info ( "" + (++count) + ": " + msg );
+ }
+
+ if ( count > nextReport )
+ {
+ nextReport += 5000;
+
+ final long endMs = System.currentTimeMillis ();
+ final long elapsedMs = endMs - startMs;
+ final double elapsedSec = elapsedMs / 1000.0;
+ final double eps = count / elapsedSec;
+ log.info ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
+ }
+ log.info ( "" + (++count) + ": consumed message" );
+ }
+ }
+ catch ( Exception x )
+ {
+ log.error( x.getClass().getName () + ": " + x.getMessage () );
+ throw new IllegalArgumentException(x);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java b/src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java
new file mode 100644
index 0000000..29d7f85
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java
@@ -0,0 +1,86 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.test.clients;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRClientBuilders.PublisherBuilder;
+import org.onap.dmaap.mr.client.MRPublisher.message;
+
+public class SamplePublisher {
+ public static void main ( String[] args ) throws IOException, InterruptedException
+ {
+ final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class);
+ // read the hosts(s) from the command line
+ final String hosts = ( args.length > 0 ? args[0] : "localhost:8181" );
+
+ // read the topic name from the command line
+
+ final String topic = ( args.length > 1 ? args[1] : "com.att.app.dmaap.mr.testingTopic" );
+
+ // set up some batch limits and the compression flag
+ final int maxBatchSize = 100;
+ final int maxAgeMs = 250;
+ final boolean withGzip = false;
+
+ // create our publisher
+
+ final MRBatchingPublisher pub = new PublisherBuilder ().
+ usingHosts ( hosts ).
+ onTopic ( topic ).limitBatch(maxBatchSize, maxAgeMs).
+ authenticatedBy ( "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" ).
+ build ()
+ ;
+ // publish some messages
+ final JSONObject msg1 = new JSONObject ();
+ msg1.put ( "name", "tttttttttttttttt" );
+ msg1.put ( "greeting", "ooooooooooooooooo" );
+ pub.send ( "MyPartitionKey", msg1.toString () );
+
+ final JSONObject msg2 = new JSONObject ();
+ msg2.put ( "now", System.currentTimeMillis () );
+ pub.send ( "MyOtherPartitionKey", msg2.toString () );
+
+ // ...
+
+ // close the publisher to make sure everything's sent before exiting. The batching
+ // publisher interface allows the app to get the set of unsent messages. It could
+ // write them to disk, for example, to try to send them later.
+ final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS );
+ if ( stuck.isEmpty())
+ {
+ LOG.warn ( stuck.size() + " messages unsent" );
+ }
+ else
+ {
+ LOG.info ( "Clean exit; all messages sent." );
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumer.java b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumer.java
new file mode 100644
index 0000000..a7f283c
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumer.java
@@ -0,0 +1,84 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+package org.onap.dmaap.mr.test.clients;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRConsumer;
+
+public class SimpleExampleConsumer {
+
+ static FileWriter routeWriter = null;
+ static Properties props = null;
+ static FileReader routeReader = null;
+
+ public static void main(String[] args) {
+ final Logger LOG = LoggerFactory.getLogger(SimpleExampleConsumer.class);
+
+ long count = 0;
+ long nextReport = 5000;
+
+ final long startMs = System.currentTimeMillis();
+
+ try {
+ String routeFilePath = "/src/main/resources/dme2/preferredRoute.txt";
+
+ File fo = new File(routeFilePath);
+ if (!fo.exists()) {
+ routeWriter = new FileWriter(new File(routeFilePath));
+ }
+ routeReader = new FileReader(new File(routeFilePath));
+ props = new Properties();
+ final MRConsumer cc = MRClientFactory.createConsumer("/src/main/resources/dme2/consumer.properties");
+ int i = 0;
+ while (i < 10) {
+ Thread.sleep(2);
+ i++;
+ for (String msg : cc.fetch()) {
+ // System.out.println ( "" + (++count) + ": " + msg );
+ System.out.println(msg);
+ }
+
+ if (count > nextReport) {
+ nextReport += 5000;
+
+ final long endMs = System.currentTimeMillis();
+ final long elapsedMs = endMs - startMs;
+ final double elapsedSec = elapsedMs / 1000.0;
+ final double eps = count / elapsedSec;
+ System.out.println("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps");
+ }
+ }
+ } catch (Exception x) {
+ System.err.println(x.getClass().getName() + ": " + x.getMessage());
+ LOG.error("exception: ", x);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java
new file mode 100644
index 0000000..2e514b0
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java
@@ -0,0 +1,95 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.test.clients;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRConsumer;
+import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+
+public class SimpleExampleConsumerWithReturnResponse {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SimpleExampleConsumerWithReturnResponse.class);
+
+ static FileWriter routeWriter= null;
+ static Properties props=null;
+ static FileReader routeReader=null;
+ public static void main ( String[] args )
+ {
+
+ long count = 0;
+ long nextReport = 5000;
+
+ final long startMs = System.currentTimeMillis ();
+
+ try
+ {
+ String routeFilePath="src/main/resources/dme2/preferredRoute.txt";
+
+
+ File fo= new File(routeFilePath);
+ if(!fo.exists()){
+ routeWriter=new FileWriter(new File (routeFilePath));
+ }
+ routeReader= new FileReader(new File (routeFilePath));
+ props= new Properties();
+ final MRConsumer cc = MRClientFactory.createConsumer ( "src/main/resources/dme2/consumer.properties" );
+ while ( true )
+ {
+ MRConsumerResponse mrConsumerResponse = cc.fetchWithReturnConsumerResponse();
+ System.out.println("mrConsumerResponse code :"+mrConsumerResponse.getResponseCode());
+
+ System.out.println("mrConsumerResponse Message :"+mrConsumerResponse.getResponseMessage());
+
+ System.out.println("mrConsumerResponse ActualMessage :"+mrConsumerResponse.getActualMessages());
+ /*for ( String msg : mrConsumerResponse.getActualMessages() )
+ {
+ //System.out.println ( "" + (++count) + ": " + msg );
+ System.out.println(msg);
+ }*/
+ if ( count > nextReport )
+ {
+ nextReport += 5000;
+
+ final long endMs = System.currentTimeMillis ();
+ final long elapsedMs = endMs - startMs;
+ final double elapsedSec = elapsedMs / 1000.0;
+ final double eps = count / elapsedSec;
+ System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
+ }
+ }
+ }
+ catch ( Exception x )
+ {
+ System.err.println ( x.getClass().getName () + ": " + x.getMessage () );
+ LOG.error("exception: ", x);
+ }
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisher.java b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisher.java
new file mode 100644
index 0000000..770d916
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisher.java
@@ -0,0 +1,98 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+package org.onap.dmaap.mr.test.clients;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.json.JSONObject;
+
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRPublisher.message;
+
+/**
+ * An example of how to use the Java publisher.
+ * @author author
+ */
+public class SimpleExamplePublisher
+{
+ static FileWriter routeWriter= null;
+ static Properties props=null;
+ static FileReader routeReader=null;
+ public void publishMessage ( String producerFilePath ) throws IOException, InterruptedException, Exception
+ {
+
+ // create our publisher
+ final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath);
+ // publish some messages
+ final JSONObject msg1 = new JSONObject ();
+ msg1.put ( "Name", "Sprint" );
+
+ pub.send ( "First cambria messge" );
+ pub.send ( "MyPartitionKey", msg1.toString () );
+
+ final JSONObject msg2 = new JSONObject ();
+
+
+
+ // ...
+
+ // close the publisher to make sure everything's sent before exiting. The batching
+ // publisher interface allows the app to get the set of unsent messages. It could
+ // write them to disk, for example, to try to send them later.
+ final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS );
+ if ( stuck.isEmpty() )
+ {
+ System.err.println ( stuck.size() + " messages unsent" );
+ }
+ else
+ {
+ System.out.println ( "Clean exit; all messages sent." );
+ }
+ }
+
+ public static void main(String []args) throws InterruptedException, Exception{
+
+ String routeFilePath="/src/main/resources/dme2/preferredRoute.txt";
+
+ SimpleExamplePublisher publisher = new SimpleExamplePublisher();
+
+
+ File fo= new File(routeFilePath);
+ if(!fo.exists()){
+ routeWriter=new FileWriter(new File (routeFilePath));
+ }
+ routeReader= new FileReader(new File (routeFilePath));
+ props= new Properties();
+ publisher.publishMessage("/src/main/resources/dme2/producer.properties");
+ }
+
+}
+
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisherWithResponse.java b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisherWithResponse.java
new file mode 100644
index 0000000..33d1164
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisherWithResponse.java
@@ -0,0 +1,84 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.test.clients;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.json.JSONObject;
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.response.MRPublisherResponse;
+ /**
+ *An example of how to use the Java publisher.
+ * @author author
+ *
+ */
+ public class SimpleExamplePublisherWithResponse
+ {
+ static FileWriter routeWriter= null;
+ static Properties props=null;
+ static FileReader routeReader=null;
+
+ public static void main(String []args) throws InterruptedException, Exception{
+
+ String routeFilePath="src/main/resources/dme2/preferredRoute.txt";
+ String msgCount = args[0];
+ SimpleExamplePublisherWithResponse publisher = new SimpleExamplePublisherWithResponse();
+ File fo= new File(routeFilePath);
+ if(!fo.exists()){
+ routeWriter=new FileWriter(new File (routeFilePath));
+ }
+ routeReader= new FileReader(new File (routeFilePath));
+ props= new Properties();
+ int i=0;
+ while (i< Integer.valueOf(msgCount))
+ {
+ publisher.publishMessage("src/main/resources/dme2/producer.properties",Integer.valueOf(msgCount));
+ i++;
+ }
+ }
+
+ public void publishMessage ( String producerFilePath , int count ) throws IOException, InterruptedException
+ {
+ // create our publisher
+ final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath,true);
+ // publish some messages
+ final JSONObject msg1 = new JSONObject ();
+
+ msg1.put ( "Partition:1", "Message:"+count);
+ msg1.put ( "greeting", "Hello .." );
+
+
+ pub.send ( "1", msg1.toString());
+ pub.send ( "1", msg1.toString());
+
+ MRPublisherResponse res= pub.sendBatchWithResponse();
+
+ System.out.println("Pub response->"+res.toString());
+ }
+
+
+ }