diff options
author | Varun Gudisena <vg411h@att.com> | 2017-08-31 10:44:28 -0500 |
---|---|---|
committer | Varun Gudisena <vg411h@att.com> | 2017-08-31 10:44:41 -0500 |
commit | 7d45c179879363222fcf49b30f75837f66d7f423 (patch) | |
tree | c5a344247515c1d8b74a6cc74bcea63541e4b46f /src/main/java/com/att/nsa/mr/test | |
parent | cc9de9bc6803212f0233e0e1bf06aa63fe8b7a6a (diff) |
Revert package name changes
Reverted package name changes to avoid any potential issues. Renamed maven
group id only.
Issue-id: DMAAP-74
Change-Id: I36c2aef063050c265640b79e6dc0e8ab7add8d22
Signed-off-by: Varun Gudisena <vg411h@att.com>
Diffstat (limited to 'src/main/java/com/att/nsa/mr/test')
10 files changed, 1016 insertions, 0 deletions
diff --git a/src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java b/src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java new file mode 100644 index 0000000..2294d7b --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/ConsolePublisher.java @@ -0,0 +1,87 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.nsa.mr.test.clients; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRPublisher.message; + +/** + * A simple publisher that reads from std in, sending each line as a message. + * @author author + */ +public class ConsolePublisher +{ + public static void main ( String[] args ) throws IOException //throws IOException, InterruptedException + { + // read the hosts(s) from the command line + final String hosts = ( args.length > 0 ? args[0] : "aaa.it.att.com,bbb.it.att.com,ccc.it.att.com" ); + + // read the topic name from the command line + final String topic = ( args.length > 1 ? args[1] : "TEST-TOPIC" ); + + // read the topic name from the command line + final String partition = ( args.length > 2 ? args[2] : UUID.randomUUID ().toString () ); + + // set up some batch limits and the compression flag + final int maxBatchSize = 100; + final long maxAgeMs = 250; + final boolean withGzip = false; + + // create our publisher + final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, withGzip ); + + final BufferedReader cin = new BufferedReader ( new InputStreamReader ( System.in ) ); + try + { + String line = null; + while ( ( line = cin.readLine () ) != null ) + { + pub.send ( partition, line ); + } + } + finally + { + List<message> leftovers = null; + try + { + leftovers = pub.close ( 10, TimeUnit.SECONDS ); + } + catch ( InterruptedException e ) + { + System.err.println ( "Send on close interrupted." ); + } + for ( message m : leftovers ) + { + System.err.println ( "Unsent message: " + m.fMsg ); + } + } + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java new file mode 100644 index 0000000..6e86d47 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java @@ -0,0 +1,46 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +/** + * + */ +package com.att.nsa.mr.test.clients; + +/** + * @author author + * + */ +public enum ProtocolTypeConstants { + + DME2("DME2"), + AAF_AUTH("HTTPAAF"), + AUTH_KEY("HTTPAUTH"); + + private String value; + + private ProtocolTypeConstants(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java b/src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java new file mode 100644 index 0000000..8e1c0e0 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SampleConsumer.java @@ -0,0 +1,87 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.clients; + +import java.io.IOException; +import java.util.LinkedList; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; + +public class SampleConsumer { + public static void main ( String[] args ) + { + final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class); + + + LOG.info("Sample Consumer Class executing"); + final String topic = "com.att.app.dmaap.mr.testingTopic"; + final String url = ( args.length > 1 ? args[1] : "localhost:8181" ); + final String group = ( args.length > 2 ? args[2] :"grp" ); + /*final String id = ( args.length > 3 ? args[3] : "0" );*/ + final String id = ( args.length > 3 ? args[3] : "1" ); + + long count = 0; + long nextReport = 5000; + + final long startMs = System.currentTimeMillis (); + + final LinkedList<String> urlList = new LinkedList<String> (); + for ( String u : url.split ( "," ) ) + { + urlList.add ( u ); + } + + final MRConsumer cc = MRClientFactory.createConsumer ( urlList, topic, group, id, 10*1000, 1000, null, "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" ); + try + { + while ( true ) + { + for ( String msg : cc.fetch () ) + { + //System.out.println ( "" + (++count) + ": " + msg ); + LOG.info ( "" + (++count) + ": " + msg ); + } + + if ( count > nextReport ) + { + nextReport += 5000; + + final long endMs = System.currentTimeMillis (); + final long elapsedMs = endMs - startMs; + final double elapsedSec = elapsedMs / 1000.0; + final double eps = count / elapsedSec; + //System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + LOG.info ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + } + LOG.info ( "" + (++count) + ": consumed message" ); + } + } + catch ( Exception x ) + { + System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); + } + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java b/src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java new file mode 100644 index 0000000..36c2b7f --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SamplePublisher.java @@ -0,0 +1,86 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.clients; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRClientBuilders.PublisherBuilder; +import com.att.nsa.mr.client.MRPublisher.message; + +public class SamplePublisher { + public static void main ( String[] args ) throws IOException, InterruptedException + { + final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class); + // read the hosts(s) from the command line + final String hosts = ( args.length > 0 ? args[0] : "localhost:8181" ); + + // read the topic name from the command line + //final String topic = ( args.length > 1 ? args[1] : "MY-EXAMPLE-TOPIC" ); + final String topic = ( args.length > 1 ? args[1] : "com.att.app.dmaap.mr.testingTopic" ); + + // set up some batch limits and the compression flag + final int maxBatchSize = 100; + final int maxAgeMs = 250; + final boolean withGzip = false; + + // create our publisher + + final MRBatchingPublisher pub = new PublisherBuilder (). + usingHosts ( hosts ). + onTopic ( topic ).limitBatch(maxBatchSize, maxAgeMs). + authenticatedBy ( "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" ). + build () + ; + // publish some messages + final JSONObject msg1 = new JSONObject (); + msg1.put ( "name", "tttttttttttttttt" ); + msg1.put ( "greeting", "ooooooooooooooooo" ); + pub.send ( "MyPartitionKey", msg1.toString () ); + + final JSONObject msg2 = new JSONObject (); + msg2.put ( "now", System.currentTimeMillis () ); + pub.send ( "MyOtherPartitionKey", msg2.toString () ); + + // ... + + // close the publisher to make sure everything's sent before exiting. The batching + // publisher interface allows the app to get the set of unsent messages. It could + // write them to disk, for example, to try to send them later. + final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS ); + if ( stuck.size () > 0 ) + { + LOG.warn ( stuck.size() + " messages unsent" ); + } + else + { + LOG.info ( "Clean exit; all messages sent." ); + } + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java new file mode 100644 index 0000000..3a131a0 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java @@ -0,0 +1,86 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.nsa.mr.test.clients; + + + +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.util.Properties; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; + +public class SimpleExampleConsumer +{ + + static FileWriter routeWriter= null; + static Properties props=null; + static FileReader routeReader=null; + public static void main ( String[] args ) + { + + long count = 0; + long nextReport = 5000; + + final long startMs = System.currentTimeMillis (); + + try + { + String routeFilePath="/src/main/resources/dme2/preferredRoute.txt"; + + + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File (routeFilePath)); + } + routeReader= new FileReader(new File (routeFilePath)); + props= new Properties(); + final MRConsumer cc = MRClientFactory.createConsumer ( "/src/main/resources/dme2/consumer.properties" ); + while ( true ) + { + for ( String msg : cc.fetch () ) + { + //System.out.println ( "" + (++count) + ": " + msg ); + System.out.println(msg); + } + + if ( count > nextReport ) + { + nextReport += 5000; + + final long endMs = System.currentTimeMillis (); + final long elapsedMs = endMs - startMs; + final double elapsedSec = elapsedMs / 1000.0; + final double eps = count / elapsedSec; + System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + } + } + } + catch ( Exception x ) + { + System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); + } + } +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java new file mode 100644 index 0000000..b3c167b --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java @@ -0,0 +1,90 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.clients; + +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.util.Properties; + +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.response.MRConsumerResponse; + +public class SimpleExampleConsumerWithReturnResponse { + + + static FileWriter routeWriter= null; + static Properties props=null; + static FileReader routeReader=null; + public static void main ( String[] args ) + { + + long count = 0; + long nextReport = 5000; + + final long startMs = System.currentTimeMillis (); + + try + { + String routeFilePath="src/main/resources/dme2/preferredRoute.txt"; + + + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File (routeFilePath)); + } + routeReader= new FileReader(new File (routeFilePath)); + props= new Properties(); + final MRConsumer cc = MRClientFactory.createConsumer ( "src/main/resources/dme2/consumer.properties" ); + while ( true ) + { + MRConsumerResponse mrConsumerResponse = cc.fetchWithReturnConsumerResponse(); + System.out.println("mrConsumerResponse code :"+mrConsumerResponse.getResponseCode()); + + System.out.println("mrConsumerResponse Message :"+mrConsumerResponse.getResponseMessage()); + + System.out.println("mrConsumerResponse ActualMessage :"+mrConsumerResponse.getActualMessages()); + /*for ( String msg : mrConsumerResponse.getActualMessages() ) + { + //System.out.println ( "" + (++count) + ": " + msg ); + System.out.println(msg); + }*/ + if ( count > nextReport ) + { + nextReport += 5000; + + final long endMs = System.currentTimeMillis (); + final long elapsedMs = endMs - startMs; + final double elapsedSec = elapsedMs / 1000.0; + final double eps = count / elapsedSec; + System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" ); + } + } + } + catch ( Exception x ) + { + System.err.println ( x.getClass().getName () + ": " + x.getMessage () ); + } + } + +} diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java new file mode 100644 index 0000000..01e770e --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisher.java @@ -0,0 +1,98 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.nsa.mr.test.clients; + +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.json.JSONObject; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRPublisher.message; + +/** + * An example of how to use the Java publisher. + * @author author + */ +public class SimpleExamplePublisher +{ + static FileWriter routeWriter= null; + static Properties props=null; + static FileReader routeReader=null; + public void publishMessage ( String producerFilePath ) throws IOException, InterruptedException, Exception + { + + // create our publisher + final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath); + // publish some messages + final JSONObject msg1 = new JSONObject (); + msg1.put ( "Name", "Sprint" ); + //msg1.put ( "greeting", "Hello .." ); + pub.send ( "First cambria messge" ); + pub.send ( "MyPartitionKey", msg1.toString () ); + + final JSONObject msg2 = new JSONObject (); + //msg2.put ( "mrclient1", System.currentTimeMillis () ); + + + // ... + + // close the publisher to make sure everything's sent before exiting. The batching + // publisher interface allows the app to get the set of unsent messages. It could + // write them to disk, for example, to try to send them later. + final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS ); + if ( stuck.size () > 0 ) + { + System.err.println ( stuck.size() + " messages unsent" ); + } + else + { + System.out.println ( "Clean exit; all messages sent." ); + } + } + + public static void main(String []args) throws InterruptedException, Exception{ + + String routeFilePath="/src/main/resources/dme2/preferredRoute.txt"; + + SimpleExamplePublisher publisher = new SimpleExamplePublisher(); + + + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File (routeFilePath)); + } + routeReader= new FileReader(new File (routeFilePath)); + props= new Properties(); + publisher.publishMessage("/src/main/resources/dme2/producer.properties"); + } + +} + diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java new file mode 100644 index 0000000..4914688 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java @@ -0,0 +1,84 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.clients; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.json.JSONObject; +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.response.MRPublisherResponse; + /** + *An example of how to use the Java publisher. + * @author author + * + */ + public class SimpleExamplePublisherWithResponse + { + static FileWriter routeWriter= null; + static Properties props=null; + static FileReader routeReader=null; + + public static void main(String []args) throws InterruptedException, Exception{ + + String routeFilePath="src/main/resources/dme2/preferredRoute.txt"; + String msgCount = args[0]; + SimpleExamplePublisherWithResponse publisher = new SimpleExamplePublisherWithResponse(); + File fo= new File(routeFilePath); + if(!fo.exists()){ + routeWriter=new FileWriter(new File (routeFilePath)); + } + routeReader= new FileReader(new File (routeFilePath)); + props= new Properties(); + int i=0; + while (i< Integer.valueOf(msgCount)) + { + publisher.publishMessage("src/main/resources/dme2/producer.properties",Integer.valueOf(msgCount)); + i++; + } + } + + public void publishMessage ( String producerFilePath , int count ) throws IOException, InterruptedException, Exception + { + // create our publisher + final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath,true); + // publish some messages + final JSONObject msg1 = new JSONObject (); + + msg1.put ( "Partition:1", "Message:"+count); + msg1.put ( "greeting", "Hello .." ); + + + pub.send ( "1", msg1.toString()); + pub.send ( "1", msg1.toString()); + + MRPublisherResponse res= pub.sendBatchWithResponse(); + + System.out.println("Pub response->"+res.toString()); + } + + + } diff --git a/src/main/java/com/att/nsa/mr/test/support/MRBatchingPublisherMock.java b/src/main/java/com/att/nsa/mr/test/support/MRBatchingPublisherMock.java new file mode 100644 index 0000000..3c123b1 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/support/MRBatchingPublisherMock.java @@ -0,0 +1,184 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.support; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; + +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.response.MRPublisherResponse; + +/** + * A helper for unit testing systems that use a MRPublisher. When setting + * up your test, inject an instance into MRClientFactory to have it return + * the mock client. + * + * @author author + * + */ +public class MRBatchingPublisherMock implements MRBatchingPublisher +{ + public class Entry + { + public Entry ( String partition, String msg ) + { + fPartition = partition; + fMessage = msg; + } + + @Override + public String toString () + { + return fMessage; + } + + public final String fPartition; + public final String fMessage; + } + + public MRBatchingPublisherMock () + { + fCaptures = new LinkedList<Entry> (); + } + + public interface Listener + { + void onMessage ( Entry e ); + } + public void addListener ( Listener listener ) + { + fListeners.add ( listener ); + } + + public List<Entry> getCaptures () + { + return getCaptures ( new MessageFilter () { @Override public boolean match ( String msg ) { return true; } } ); + } + + public interface MessageFilter + { + boolean match ( String msg ); + } + + public List<Entry> getCaptures ( MessageFilter filter ) + { + final LinkedList<Entry> result = new LinkedList<Entry> (); + for ( Entry capture : fCaptures ) + { + if ( filter.match ( capture.fMessage ) ) + { + result.add ( capture ); + } + } + return result; + } + + public int received () + { + return fCaptures.size(); + } + + public void reset () + { + fCaptures.clear (); + } + + @Override + public int send ( String partition, String msg ) + { + final Entry e = new Entry ( partition, msg ); + + fCaptures.add ( e ); + for ( Listener l : fListeners ) + { + l.onMessage ( e ); + } + return 1; + } + + @Override + public int send ( message msg ) + { + return send ( msg.fPartition, msg.fMsg ); + } + @Override + public int send ( String msg ) + { + return 1; + + } + + @Override + public int send ( Collection<message> msgs ) + { + int sum = 0; + for ( message m : msgs ) + { + sum += send ( m ); + } + return sum; + } + + @Override + public int getPendingMessageCount () + { + return 0; + } + + @Override + public List<message> close ( long timeout, TimeUnit timeoutUnits ) + { + return new LinkedList<message> (); + } + + @Override + public void close () + { + } + + @Override + public void setApiCredentials ( String apiKey, String apiSecret ) + { + } + + @Override + public void clearApiCredentials () + { + } + + @Override + public void logTo ( Logger log ) + { + } + + private final LinkedList<Entry> fCaptures; + private LinkedList<Listener> fListeners = new LinkedList<Listener> (); + @Override + public MRPublisherResponse sendBatchWithResponse() { + // TODO Auto-generated method stub + return null; + } +} diff --git a/src/main/java/com/att/nsa/mr/test/support/MRConsumerMock.java b/src/main/java/com/att/nsa/mr/test/support/MRConsumerMock.java new file mode 100644 index 0000000..3373710 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/test/support/MRConsumerMock.java @@ -0,0 +1,168 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.test.support; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.slf4j.Logger; + +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.response.MRConsumerResponse; + +/** + * A helper for unit testing systems that use a MRConsumer. When setting + * up your test, inject an instance into MRClientFactory to have it return + * the mock client. + * + * @author author + * + */ +public class MRConsumerMock implements MRConsumer +{ + public class Entry + { + public Entry ( long waitMs, int statusCode, List<String> msgs ) + { + fWaitMs = waitMs; + fStatusCode = statusCode; + fStatusMsg = null; + fMsgs = new LinkedList<String> ( msgs ); + } + + public Entry ( long waitMs, int statusCode, String statusMsg ) + { + fWaitMs = waitMs; + fStatusCode = statusCode; + fStatusMsg = statusMsg; + fMsgs = null; + } + + public LinkedList<String> run () throws IOException + { + try + { + Thread.sleep ( fWaitMs ); + if ( fStatusCode >= 200 && fStatusCode <= 299 ) + { + return fMsgs; + } + throw new IOException ( "" + fStatusCode + " " + fStatusMsg ); + } + catch ( InterruptedException e ) + { + throw new IOException ( e ); + } + } + + private final long fWaitMs; + private final int fStatusCode; + private final String fStatusMsg; + private final LinkedList<String> fMsgs; + } + + public MRConsumerMock () + { + fReplies = new LinkedList<Entry> (); + } + + @Override + public void close () + { + } + + @Override + public void setApiCredentials ( String apiKey, String apiSecret ) + { + } + + @Override + public void clearApiCredentials () + { + } + + public synchronized void add ( Entry e ) + { + fReplies.add ( e ); + } + + public void addImmediateMsg ( String msg ) + { + addDelayedMsg ( 0, msg ); + } + + public void addDelayedMsg ( long delay, String msg ) + { + final LinkedList<String> list = new LinkedList<String> (); + list.add ( msg ); + add ( new Entry ( delay, 200, list ) ); + } + + public void addImmediateMsgGroup ( List<String> msgs ) + { + addDelayedMsgGroup ( 0, msgs ); + } + + public void addDelayedMsgGroup ( long delay, List<String> msgs ) + { + final LinkedList<String> list = new LinkedList<String> ( msgs ); + add ( new Entry ( delay, 200, list ) ); + } + + public void addImmediateError ( int statusCode, String statusText ) + { + add ( new Entry ( 0, statusCode, statusText ) ); + } + + @Override + public Iterable<String> fetch () throws IOException + { + return fetch ( -1, -1 ); + } + + @Override + public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException + { + return fReplies.size () > 0 ? fReplies.removeFirst ().run() : new LinkedList<String>(); + } + + @Override + public void logTo ( Logger log ) + { + } + + private final LinkedList<Entry> fReplies; + + @Override + public MRConsumerResponse fetchWithReturnConsumerResponse() { + // TODO Auto-generated method stub + return null; + } + + @Override + public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, + int limit) { + // TODO Auto-generated method stub + return null; + } +} |