From 7d45c179879363222fcf49b30f75837f66d7f423 Mon Sep 17 00:00:00 2001 From: Varun Gudisena Date: Thu, 31 Aug 2017 10:44:28 -0500 Subject: Revert package name changes Reverted package name changes to avoid any potential issues. Renamed maven group id only. Issue-id: DMAAP-74 Change-Id: I36c2aef063050c265640b79e6dc0e8ab7add8d22 Signed-off-by: Varun Gudisena --- .../att/nsa/mr/test/clients/ConsolePublisher.java | 87 +++++++++++++++++++ .../nsa/mr/test/clients/ProtocolTypeConstants.java | 46 ++++++++++ .../att/nsa/mr/test/clients/SampleConsumer.java | 87 +++++++++++++++++++ .../att/nsa/mr/test/clients/SamplePublisher.java | 86 +++++++++++++++++++ .../nsa/mr/test/clients/SimpleExampleConsumer.java | 86 +++++++++++++++++++ .../SimpleExampleConsumerWithReturnResponse.java | 90 ++++++++++++++++++++ .../mr/test/clients/SimpleExamplePublisher.java | 98 ++++++++++++++++++++++ .../SimpleExamplePublisherWithResponse.java | 84 +++++++++++++++++++ 8 files changed, 664 insertions(+) create mode 100644 src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java create mode 100644 src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java create mode 100644 src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java create mode 100644 src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java create mode 100644 src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java create mode 100644 src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java create mode 100644 src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java create mode 100644 src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java (limited to 'src/main/java/com/att/nsa/mr/test/clients') diff --git a/src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java b/src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java new file mode 100644 index 0000000..2294d7b --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java @@ -0,0 +1,87 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.nsa.mr.test.clients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRPublisher.message; + +/** + * A simple publisher that reads from std in, sending each line as a message. + * @author author + */ +public class ConsolePublisher +{ + public static void main ( String[] args ) throws IOException //throws IOException, InterruptedException + { + // read the hosts(s) from the command line + final String hosts = ( args.length > 0 ? args[0] : "aaa.it.att.com,bbb.it.att.com,ccc.it.att.com" ); + + // read the topic name from the command line + final String topic = ( args.length > 1 ? args[1] : "TEST-TOPIC" ); + + // read the topic name from the command line + final String partition = ( args.length > 2 ? args[2] : UUID.randomUUID ().toString () ); + + // set up some batch limits and the compression flag + final int maxBatchSize = 100; + final long maxAgeMs = 250; + final boolean withGzip = false; + + // create our publisher + final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, withGzip ); + + final BufferedReader cin = new BufferedReader ( new InputStreamReader ( System.in ) ); + try + { + String line = null; + while ( ( line = cin.readLine () ) != null ) + { + pub.send ( partition, line ); + } + } + finally + { + List leftovers = null; + try + { + leftovers = pub.close ( 10, TimeUnit.SECONDS ); + } + catch ( InterruptedException e ) + { + System.err.println ( "Send on close interrupted." ); + } + for ( message m : leftovers ) + { + System.err.println ( "Unsent message: " + m.fMsg ); + } + } + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java new file mode 100644 index 0000000..6e86d47 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java @@ -0,0 +1,46 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +/** + * + */ +package com.att.nsa.mr.test.clients; + +/** + * @author author + * + */ +public enum ProtocolTypeConstants { + + DME2("DME2"), + AAF_AUTH("HTTPAAF"), + AUTH_KEY("HTTPAUTH"); + + private String value; + + private ProtocolTypeConstants(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java b/src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java new file mode 100644 index 0000000..8e1c0e0 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java @@ -0,0 +1,87 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.clients; + +import java.io.IOException; +import java.util.LinkedList; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; + +public class SampleConsumer { + public static void main ( String[] args ) + { + final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class); + + + LOG.info("Sample Consumer Class executing"); + final String topic = "com.att.app.dmaap.mr.testingTopic"; + final String url = ( args.length > 1 ? args[1] : "localhost:8181" ); + final String group = ( args.length > 2 ? args[2] :"grp" ); + /*final String id = ( args.length > 3 ? args[3] : "0" );*/ + final String id = ( args.length > 3 ? args[3] : "1" ); + + long count = 0; + long nextReport = 5000; + + final long startMs = System.currentTimeMillis (); + + final LinkedList urlList = new LinkedList (); + for ( String u : url.split ( "," ) ) + { + urlList.add ( u ); + } + + final MRConsumer cc = MRClientFactory.createConsumer ( urlList, topic, group, id, 10*1000, 1000, null, "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" ); + try + { + while ( true ) + { + for ( String msg : cc.fetch () ) + { + //System.out.println ( "" + (++count) + ": " + msg ); + LOG.info ( "" + (++count) + ": " + msg ); + } + + if ( count > nextReport ) + { + nextReport += 5000; + + final long endMs = System.currentTimeMillis (); + final long elapsedMs = endMs - startMs; + final double elapsedSec = elapsedMs / 1000.0; + final double eps = count / elapsedSec; + //System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + LOG.info ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + } + LOG.info ( "" + (++count) + ": consumed message" ); + } + } + catch ( Exception x ) + { + System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); + } + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java b/src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java new file mode 100644 index 0000000..36c2b7f --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java @@ -0,0 +1,86 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.clients; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRClientBuilders.PublisherBuilder; +import com.att.nsa.mr.client.MRPublisher.message; + +public class SamplePublisher { + public static void main ( String[] args ) throws IOException, InterruptedException + { + final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class); + // read the hosts(s) from the command line + final String hosts = ( args.length > 0 ? args[0] : "localhost:8181" ); + + // read the topic name from the command line + //final String topic = ( args.length > 1 ? args[1] : "MY-EXAMPLE-TOPIC" ); + final String topic = ( args.length > 1 ? args[1] : "com.att.app.dmaap.mr.testingTopic" ); + + // set up some batch limits and the compression flag + final int maxBatchSize = 100; + final int maxAgeMs = 250; + final boolean withGzip = false; + + // create our publisher + + final MRBatchingPublisher pub = new PublisherBuilder (). + usingHosts ( hosts ). + onTopic ( topic ).limitBatch(maxBatchSize, maxAgeMs). + authenticatedBy ( "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" ). + build () + ; + // publish some messages + final JSONObject msg1 = new JSONObject (); + msg1.put ( "name", "tttttttttttttttt" ); + msg1.put ( "greeting", "ooooooooooooooooo" ); + pub.send ( "MyPartitionKey", msg1.toString () ); + + final JSONObject msg2 = new JSONObject (); + msg2.put ( "now", System.currentTimeMillis () ); + pub.send ( "MyOtherPartitionKey", msg2.toString () ); + + // ... + + // close the publisher to make sure everything's sent before exiting. The batching + // publisher interface allows the app to get the set of unsent messages. It could + // write them to disk, for example, to try to send them later. + final List stuck = pub.close ( 20, TimeUnit.SECONDS ); + if ( stuck.size () > 0 ) + { + LOG.warn ( stuck.size() + " messages unsent" ); + } + else + { + LOG.info ( "Clean exit; all messages sent." ); + } + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java new file mode 100644 index 0000000..3a131a0 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java @@ -0,0 +1,86 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.nsa.mr.test.clients; + + + +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.util.Properties; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; + +public class SimpleExampleConsumer +{ + + static FileWriter routeWriter= null; + static Properties props=null; + static FileReader routeReader=null; + public static void main ( String[] args ) + { + + long count = 0; + long nextReport = 5000; + + final long startMs = System.currentTimeMillis (); + + try + { + String routeFilePath="/src/main/resources/dme2/preferredRoute.txt"; + + + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File (routeFilePath)); + } + routeReader= new FileReader(new File (routeFilePath)); + props= new Properties(); + final MRConsumer cc = MRClientFactory.createConsumer ( "/src/main/resources/dme2/consumer.properties" ); + while ( true ) + { + for ( String msg : cc.fetch () ) + { + //System.out.println ( "" + (++count) + ": " + msg ); + System.out.println(msg); + } + + if ( count > nextReport ) + { + nextReport += 5000; + + final long endMs = System.currentTimeMillis (); + final long elapsedMs = endMs - startMs; + final double elapsedSec = elapsedMs / 1000.0; + final double eps = count / elapsedSec; + System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + } + } + } + catch ( Exception x ) + { + System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); + } + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java new file mode 100644 index 0000000..b3c167b --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java @@ -0,0 +1,90 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.clients; + +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.util.Properties; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.response.MRConsumerResponse; + +public class SimpleExampleConsumerWithReturnResponse { + + + static FileWriter routeWriter= null; + static Properties props=null; + static FileReader routeReader=null; + public static void main ( String[] args ) + { + + long count = 0; + long nextReport = 5000; + + final long startMs = System.currentTimeMillis (); + + try + { + String routeFilePath="src/main/resources/dme2/preferredRoute.txt"; + + + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File (routeFilePath)); + } + routeReader= new FileReader(new File (routeFilePath)); + props= new Properties(); + final MRConsumer cc = MRClientFactory.createConsumer ( "src/main/resources/dme2/consumer.properties" ); + while ( true ) + { + MRConsumerResponse mrConsumerResponse = cc.fetchWithReturnConsumerResponse(); + System.out.println("mrConsumerResponse code :"+mrConsumerResponse.getResponseCode()); + + System.out.println("mrConsumerResponse Message :"+mrConsumerResponse.getResponseMessage()); + + System.out.println("mrConsumerResponse ActualMessage :"+mrConsumerResponse.getActualMessages()); + /*for ( String msg : mrConsumerResponse.getActualMessages() ) + { + //System.out.println ( "" + (++count) + ": " + msg ); + System.out.println(msg); + }*/ + if ( count > nextReport ) + { + nextReport += 5000; + + final long endMs = System.currentTimeMillis (); + final long elapsedMs = endMs - startMs; + final double elapsedSec = elapsedMs / 1000.0; + final double eps = count / elapsedSec; + System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + } + } + } + catch ( Exception x ) + { + System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); + } + } + +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java new file mode 100644 index 0000000..01e770e --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java @@ -0,0 +1,98 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.nsa.mr.test.clients; + +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.json.JSONObject; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRPublisher.message; + +/** + * An example of how to use the Java publisher. + * @author author + */ +public class SimpleExamplePublisher +{ + static FileWriter routeWriter= null; + static Properties props=null; + static FileReader routeReader=null; + public void publishMessage ( String producerFilePath ) throws IOException, InterruptedException, Exception + { + + // create our publisher + final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath); + // publish some messages + final JSONObject msg1 = new JSONObject (); + msg1.put ( "Name", "Sprint" ); + //msg1.put ( "greeting", "Hello .." ); + pub.send ( "First cambria messge" ); + pub.send ( "MyPartitionKey", msg1.toString () ); + + final JSONObject msg2 = new JSONObject (); + //msg2.put ( "mrclient1", System.currentTimeMillis () ); + + + // ... + + // close the publisher to make sure everything's sent before exiting. The batching + // publisher interface allows the app to get the set of unsent messages. It could + // write them to disk, for example, to try to send them later. + final List stuck = pub.close ( 20, TimeUnit.SECONDS ); + if ( stuck.size () > 0 ) + { + System.err.println ( stuck.size() + " messages unsent" ); + } + else + { + System.out.println ( "Clean exit; all messages sent." ); + } + } + + public static void main(String []args) throws InterruptedException, Exception{ + + String routeFilePath="/src/main/resources/dme2/preferredRoute.txt"; + + SimpleExamplePublisher publisher = new SimpleExamplePublisher(); + + + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File (routeFilePath)); + } + routeReader= new FileReader(new File (routeFilePath)); + props= new Properties(); + publisher.publishMessage("/src/main/resources/dme2/producer.properties"); + } + +} + diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java new file mode 100644 index 0000000..4914688 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java @@ -0,0 +1,84 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.clients; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.json.JSONObject; +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.response.MRPublisherResponse; + /** + *An example of how to use the Java publisher. + * @author author + * + */ + public class SimpleExamplePublisherWithResponse + { + static FileWriter routeWriter= null; + static Properties props=null; + static FileReader routeReader=null; + + public static void main(String []args) throws InterruptedException, Exception{ + + String routeFilePath="src/main/resources/dme2/preferredRoute.txt"; + String msgCount = args[0]; + SimpleExamplePublisherWithResponse publisher = new SimpleExamplePublisherWithResponse(); + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File (routeFilePath)); + } + routeReader= new FileReader(new File (routeFilePath)); + props= new Properties(); + int i=0; + while (i< Integer.valueOf(msgCount)) + { + publisher.publishMessage("src/main/resources/dme2/producer.properties",Integer.valueOf(msgCount)); + i++; + } + } + + public void publishMessage ( String producerFilePath , int count ) throws IOException, InterruptedException, Exception + { + // create our publisher + final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath,true); + // publish some messages + final JSONObject msg1 = new JSONObject (); + + msg1.put ( "Partition:1", "Message:"+count); + msg1.put ( "greeting", "Hello .." ); + + + pub.send ( "1", msg1.toString()); + pub.send ( "1", msg1.toString()); + + MRPublisherResponse res= pub.sendBatchWithResponse(); + + System.out.println("Pub response->"+res.toString()); + } + + + } -- cgit 1.2.3-korg