aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/mr/test
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr/test')
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java95
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/ProtocolTypeConstants.java44
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java87
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java85
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumer.java84
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java98
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisher.java97
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisherWithResponse.java82
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/support/MRBatchingPublisherMock.java184
-rw-r--r--src/main/java/org/onap/dmaap/mr/test/support/MRConsumerMock.java169
10 files changed, 0 insertions, 1025 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java b/src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java
deleted file mode 100644
index d8b1979..0000000
--- a/src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-
-package org.onap.dmaap.mr.test.clients;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.MRPublisher.message;
-
-/**
- * A simple publisher that reads from std in, sending each line as a message.
- * @author author
- */
-public class ConsolePublisher
-{
-
- private static final Logger logger = LoggerFactory.getLogger(ConsolePublisher.class);
- private ConsolePublisher() {
- }
- public static void main ( String[] args ) throws IOException //throws IOException, InterruptedException
- {
- // read the hosts(s) from the command line
- final String hosts = args.length > 0 ? args[0] : "mr1.onap.com,mr2.onap.com,mr3.onap.com";
-
- // read the topic name from the command line
- final String topic = args.length > 1 ? args[1] : "TEST-TOPIC";
-
- // read the topic name from the command line
- final String partition = args.length > 2 ? args[2] : UUID.randomUUID ().toString ();
-
- // set up some batch limits and the compression flag
- final int maxBatchSize = 100;
- final long maxAgeMs = 250;
- final boolean withGzip = false;
-
- // create our publisher
- final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, withGzip );
-
- final BufferedReader cin = new BufferedReader ( new InputStreamReader ( System.in ) );
- try
- {
- String line = null;
- while ( ( line = cin.readLine () ) != null )
- {
- pub.send ( partition, line );
- }
- }
- finally
- {
- List<message> leftovers = null;
- try
- {
- leftovers = pub.close ( 10, TimeUnit.SECONDS );
- }
- catch ( InterruptedException e )
- {
- logger.error( "Send on close interrupted." );
- Thread.currentThread().interrupt();
- }
- for ( message m : leftovers )
- {
- logger.error( "Unsent message: " + m.fMsg );
- }
- }
- }
-}
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/ProtocolTypeConstants.java b/src/main/java/org/onap/dmaap/mr/test/clients/ProtocolTypeConstants.java
deleted file mode 100644
index 6ece7f7..0000000
--- a/src/main/java/org/onap/dmaap/mr/test/clients/ProtocolTypeConstants.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-/**
- *
- */
-package org.onap.dmaap.mr.test.clients;
-
-/**
- * @author author
- *
- */
-public enum ProtocolTypeConstants {
-
- DME2("DME2"), AAF_AUTH("HTTPAAF"), AUTH_KEY("HTTPAUTH"), HTTPNOAUTH("HTTPNOAUTH");
-
- private String value;
-
- private ProtocolTypeConstants(String value) {
- this.value = value;
- }
-
- public String getValue() {
- return value;
- }
-}
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java b/src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java
deleted file mode 100644
index eb96780..0000000
--- a/src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package org.onap.dmaap.mr.test.clients;
-
-import java.util.LinkedList;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.MRConsumer;
-
-public class SampleConsumer {
- private SampleConsumer() {
- }
- public static void main ( String[] args )
- {
- final Logger log = LoggerFactory.getLogger(SampleConsumer.class);
-
-
- log.info("Sample Consumer Class executing");
- final String topic = "org.onap.dmaap.mr.testingTopic";
- final String url = ( args.length > 1 ? args[1] : "localhost:8181" );
- final String group = ( args.length > 2 ? args[2] :"grp" );
-
- final String id = ( args.length > 3 ? args[3] : "1" );
-
- long count = 0;
- long nextReport = 5000;
-
- final long startMs = System.currentTimeMillis ();
-
- final LinkedList<String> urlList = new LinkedList<> ();
- for ( String u : url.split ( "," ) )
- {
- urlList.add ( u );
- }
-
- final MRConsumer cc = MRClientFactory.createConsumer ( urlList, topic, group, id, 10*1000, 1000, null, "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" );
- try
- {
- while ( true )
- {
- for ( String msg : cc.fetch () )
- {
- log.info ( "" + (++count) + ": " + msg );
- }
-
- if ( count > nextReport )
- {
- nextReport += 5000;
-
- final long endMs = System.currentTimeMillis ();
- final long elapsedMs = endMs - startMs;
- final double elapsedSec = elapsedMs / 1000.0;
- final double eps = count / elapsedSec;
- log.info ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
- }
- log.info ( "" + (++count) + ": consumed message" );
- }
- }
- catch ( Exception x )
- {
- log.error( x.getClass().getName () + ": " + x.getMessage () );
- throw new IllegalArgumentException(x);
- }
- }
-}
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java b/src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java
deleted file mode 100644
index f857afd..0000000
--- a/src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package org.onap.dmaap.mr.test.clients;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.MRClientBuilders.PublisherBuilder;
-import org.onap.dmaap.mr.client.MRPublisher.message;
-
-public class SamplePublisher {
- public static void main ( String[] args ) throws IOException, InterruptedException
- {
- final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class);
- // read the hosts(s) from the command line
- final String hosts = ( args.length > 0 ? args[0] : "localhost:8181" );
-
- // read the topic name from the command line
-
- final String topic = ( args.length > 1 ? args[1] : "org.onap.dmaap.mr.testingTopic" );
-
- // set up some batch limits and the compression flag
- final int maxBatchSize = 100;
- final int maxAgeMs = 250;
- final boolean withGzip = false;
-
- // create our publisher
-
- final MRBatchingPublisher pub = new PublisherBuilder ().
- usingHosts ( hosts ).
- onTopic ( topic ).limitBatch(maxBatchSize, maxAgeMs).
- authenticatedBy ( "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" ).
- build ()
- ;
- // publish some messages
- final JSONObject msg1 = new JSONObject ();
- msg1.put ( "name", "tttttttttttttttt" );
- msg1.put ( "greeting", "ooooooooooooooooo" );
- pub.send ( "MyPartitionKey", msg1.toString () );
-
- final JSONObject msg2 = new JSONObject ();
- msg2.put ( "now", System.currentTimeMillis () );
- pub.send ( "MyOtherPartitionKey", msg2.toString () );
-
- // ...
-
- // close the publisher to make sure everything's sent before exiting. The batching
- // publisher interface allows the app to get the set of unsent messages. It could
- // write them to disk, for example, to try to send them later.
- final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS );
- if ( stuck.isEmpty())
- {
- LOG.warn ( stuck.size() + " messages unsent" );
- }
- else
- {
- LOG.info ( "Clean exit; all messages sent." );
- }
- }
-}
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumer.java b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumer.java
deleted file mode 100644
index a5c0c3a..0000000
--- a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-
-package org.onap.dmaap.mr.test.clients;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.MRConsumer;
-
-public class SimpleExampleConsumer {
-
- static FileWriter routeWriter = null;
- static Properties props = null;
- static FileReader routeReader = null;
-
- public static void main(String[] args) {
- final Logger LOG = LoggerFactory.getLogger(SimpleExampleConsumer.class);
-
- long count = 0;
- long nextReport = 5000;
-
- final long startMs = System.currentTimeMillis();
-
- try {
- String routeFilePath = "/src/main/resources/dme2/preferredRoute.txt";
-
- File fo = new File(routeFilePath);
- if (!fo.exists()) {
- routeWriter = new FileWriter(new File(routeFilePath));
- }
- routeReader = new FileReader(new File(routeFilePath));
- props = new Properties();
- final MRConsumer cc = MRClientFactory.createConsumer("/src/main/resources/dme2/consumer.properties");
- int i = 0;
- while (i < 10) {
- Thread.sleep(2);
- i++;
- for (String msg : cc.fetch()) {
-
- System.out.println(msg);
- }
-
- if (count > nextReport) {
- nextReport += 5000;
-
- final long endMs = System.currentTimeMillis();
- final long elapsedMs = endMs - startMs;
- final double elapsedSec = elapsedMs / 1000.0;
- final double eps = count / elapsedSec;
- System.out.println("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps");
- }
- }
- } catch (Exception x) {
- System.err.println(x.getClass().getName() + ": " + x.getMessage());
- LOG.error("exception: ", x);
- }
- }
-}
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java
deleted file mode 100644
index f7341ec..0000000
--- a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package org.onap.dmaap.mr.test.clients;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-
-public class SimpleExampleConsumerWithReturnResponse {
-
- private static final Logger LOG = LoggerFactory.getLogger(SimpleExampleConsumerWithReturnResponse.class);
-
- static FileWriter routeWriter= null;
- static Properties props=null;
- static FileReader routeReader=null;
- public static void main ( String[] args )
- {
-
- long count = 0;
- long nextReport = 5000;
- // remove while true and limite execution time in seconds
- int timeMax = 86400; // one day
- long endDate = System.currentTimeMillis() + timeMax*1000;
-
- final long startMs = System.currentTimeMillis ();
-
- try
- {
- String routeFilePath="src/main/resources/dme2/preferredRoute.txt";
-
-
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File (routeFilePath));
- }
- routeReader= new FileReader(new File (routeFilePath));
- props= new Properties();
- final MRConsumer cc = MRClientFactory.createConsumer ( "src/main/resources/dme2/consumer.properties" );
- while ( System.currentTimeMillis() < endDate )
- {
- MRConsumerResponse mrConsumerResponse = cc.fetchWithReturnConsumerResponse();
- System.out.println("mrConsumerResponse code :"+mrConsumerResponse.getResponseCode());
-
- System.out.println("mrConsumerResponse Message :"+mrConsumerResponse.getResponseMessage());
-
- System.out.println("mrConsumerResponse ActualMessage :"+mrConsumerResponse.getActualMessages());
- /*for ( String msg : mrConsumerResponse.getActualMessages() )
- {
- //System.out.println ( "" + (++count) + ": " + msg );
- System.out.println(msg);
- }*/
- if ( count > nextReport )
- {
- nextReport += 5000;
-
- final long endMs = System.currentTimeMillis ();
- final long elapsedMs = endMs - startMs;
- final double elapsedSec = elapsedMs / 1000.0;
- final double eps = count / elapsedSec;
- System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
- }
- }
- }
- catch ( Exception x )
- {
- System.err.println ( x.getClass().getName () + ": " + x.getMessage () );
- LOG.error("exception: ", x);
- }
- }
-
-}
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisher.java b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisher.java
deleted file mode 100644
index 50fd22c..0000000
--- a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisher.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-
-package org.onap.dmaap.mr.test.clients;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import org.json.JSONObject;
-
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.MRPublisher.message;
-
-/**
- * An example of how to use the Java publisher.
- * @author author
- */
-public class SimpleExamplePublisher
-{
- static FileWriter routeWriter= null;
- static Properties props=null;
- static FileReader routeReader=null;
- public void publishMessage ( String producerFilePath ) throws IOException, InterruptedException, Exception
- {
-
- // create our publisher
- final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath);
- // publish some messages
- final JSONObject msg1 = new JSONObject ();
- msg1.put ( "Name", "Sprint" );
-
- pub.send ( "First cambria messge" );
- pub.send ( "MyPartitionKey", msg1.toString () );
-
- final JSONObject msg2 = new JSONObject ();
-
-
-
- // ...
-
- // close the publisher to make sure everything's sent before exiting. The batching
- // publisher interface allows the app to get the set of unsent messages. It could
- // write them to disk, for example, to try to send them later.
- final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS );
- if ( stuck.isEmpty() )
- {
- System.err.println ( stuck.size() + " messages unsent" );
- }
- else
- {
- System.out.println ( "Clean exit; all messages sent." );
- }
- }
-
- public static void main(String []args) throws InterruptedException, Exception{
-
- String routeFilePath="/src/main/resources/dme2/preferredRoute.txt";
-
- SimpleExamplePublisher publisher = new SimpleExamplePublisher();
-
-
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File (routeFilePath));
- }
- routeReader= new FileReader(new File (routeFilePath));
- props= new Properties();
- publisher.publishMessage("/src/main/resources/dme2/producer.properties");
- }
-
-}
-
diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisherWithResponse.java b/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisherWithResponse.java
deleted file mode 100644
index d2b77ef..0000000
--- a/src/main/java/org/onap/dmaap/mr/test/clients/SimpleExamplePublisherWithResponse.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package org.onap.dmaap.mr.test.clients;
-import java.io.File;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Properties;
-import org.json.JSONObject;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.response.MRPublisherResponse;
- /**
- *An example of how to use the Java publisher.
- * @author author
- *
- */
- public class SimpleExamplePublisherWithResponse
- {
- static FileWriter routeWriter= null;
- static Properties props=null;
- static FileReader routeReader=null;
-
- public static void main(String []args) throws InterruptedException, Exception{
-
- String routeFilePath="src/main/resources/dme2/preferredRoute.txt";
- String msgCount = args[0];
- SimpleExamplePublisherWithResponse publisher = new SimpleExamplePublisherWithResponse();
- File fo= new File(routeFilePath);
- if(!fo.exists()){
- routeWriter=new FileWriter(new File (routeFilePath));
- }
- routeReader= new FileReader(new File (routeFilePath));
- props= new Properties();
- int i=0;
- while (i< Integer.valueOf(msgCount))
- {
- publisher.publishMessage("src/main/resources/dme2/producer.properties",Integer.valueOf(msgCount));
- i++;
- }
- }
-
- public void publishMessage ( String producerFilePath , int count ) throws IOException, InterruptedException
- {
- // create our publisher
- final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath,true);
- // publish some messages
- final JSONObject msg1 = new JSONObject ();
-
- msg1.put ( "Partition:1", "Message:"+count);
- msg1.put ( "greeting", "Hello .." );
-
-
- pub.send ( "1", msg1.toString());
- pub.send ( "1", msg1.toString());
-
- MRPublisherResponse res= pub.sendBatchWithResponse();
-
- System.out.println("Pub response->"+res.toString());
- }
-
-
- }
diff --git a/src/main/java/org/onap/dmaap/mr/test/support/MRBatchingPublisherMock.java b/src/main/java/org/onap/dmaap/mr/test/support/MRBatchingPublisherMock.java
deleted file mode 100644
index 3e64c35..0000000
--- a/src/main/java/org/onap/dmaap/mr/test/support/MRBatchingPublisherMock.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package org.onap.dmaap.mr.test.support;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.response.MRPublisherResponse;
-
-/**
- * A helper for unit testing systems that use a MRPublisher. When setting
- * up your test, inject an instance into MRClientFactory to have it return
- * the mock client.
- *
- * @author author
- *
- */
-public class MRBatchingPublisherMock implements MRBatchingPublisher
-{
- public class Entry
- {
- public Entry ( String partition, String msg )
- {
- fPartition = partition;
- fMessage = msg;
- }
-
- @Override
- public String toString ()
- {
- return fMessage;
- }
-
- public final String fPartition;
- public final String fMessage;
- }
-
- public MRBatchingPublisherMock ()
- {
- fCaptures = new LinkedList<> ();
- }
-
- public interface Listener
- {
- void onMessage ( Entry e );
- }
- public void addListener ( Listener listener )
- {
- fListeners.add ( listener );
- }
-
- public List<Entry> getCaptures ()
- {
- return getCaptures ( new MessageFilter () { @Override public boolean match ( String msg ) { return true; } } );
- }
-
- public interface MessageFilter
- {
- boolean match ( String msg );
- }
-
- public List<Entry> getCaptures ( MessageFilter filter )
- {
- final LinkedList<Entry> result = new LinkedList<> ();
- for ( Entry capture : fCaptures )
- {
- if ( filter.match ( capture.fMessage ) )
- {
- result.add ( capture );
- }
- }
- return result;
- }
-
- public int received ()
- {
- return fCaptures.size();
- }
-
- public void reset ()
- {
- fCaptures.clear ();
- }
-
- @Override
- public int send ( String partition, String msg )
- {
- final Entry e = new Entry ( partition, msg );
-
- fCaptures.add ( e );
- for ( Listener l : fListeners )
- {
- l.onMessage ( e );
- }
- return 1;
- }
-
- @Override
- public int send ( message msg )
- {
- return send ( msg.fPartition, msg.fMsg );
- }
- @Override
- public int send ( String msg )
- {
- return 1;
-
- }
-
- @Override
- public int send ( Collection<message> msgs )
- {
- int sum = 0;
- for ( message m : msgs )
- {
- sum += send ( m );
- }
- return sum;
- }
-
- @Override
- public int getPendingMessageCount ()
- {
- return 0;
- }
-
- @Override
- public List<message> close ( long timeout, TimeUnit timeoutUnits )
- {
- return new LinkedList<> ();
- }
-
- @Override
- public void close ()
- {
- }
-
- @Override
- public void setApiCredentials ( String apiKey, String apiSecret )
- {
- }
-
- @Override
- public void clearApiCredentials ()
- {
- }
-
- @Override
- public void logTo ( Logger log )
- {
- }
-
- private final LinkedList<Entry> fCaptures;
- private LinkedList<Listener> fListeners = new LinkedList<> ();
- @Override
- public MRPublisherResponse sendBatchWithResponse() {
- // TODO Auto-generated method stub
- return null;
- }
-}
diff --git a/src/main/java/org/onap/dmaap/mr/test/support/MRConsumerMock.java b/src/main/java/org/onap/dmaap/mr/test/support/MRConsumerMock.java
deleted file mode 100644
index afa36ea..0000000
--- a/src/main/java/org/onap/dmaap/mr/test/support/MRConsumerMock.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*******************************************************************************
- * ============LICENSE_START=======================================================
- * org.onap.dmaap
- * ================================================================================
- * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- *
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
- *******************************************************************************/
-package org.onap.dmaap.mr.test.support;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.slf4j.Logger;
-
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-
-/**
- * A helper for unit testing systems that use a MRConsumer. When setting
- * up your test, inject an instance into MRClientFactory to have it return
- * the mock client.
- *
- * @author author
- *
- */
-public class MRConsumerMock implements MRConsumer
-{
- public class Entry
- {
- public Entry ( long waitMs, int statusCode, List<String> msgs )
- {
- fWaitMs = waitMs;
- fStatusCode = statusCode;
- fStatusMsg = null;
- fMsgs = new LinkedList<> ( msgs );
- }
-
- public Entry ( long waitMs, int statusCode, String statusMsg )
- {
- fWaitMs = waitMs;
- fStatusCode = statusCode;
- fStatusMsg = statusMsg;
- fMsgs = null;
- }
-
- public LinkedList<String> run () throws IOException
- {
- try
- {
- Thread.sleep ( fWaitMs );
- if ( fStatusCode >= 200 && fStatusCode <= 299 )
- {
- return fMsgs;
- }
- throw new IOException ( "" + fStatusCode + " " + fStatusMsg );
- }
- catch ( InterruptedException e )
- {
- Thread.currentThread().interrupt();
- throw new IOException ( e );
- }
- }
-
- private final long fWaitMs;
- private final int fStatusCode;
- private final String fStatusMsg;
- private final LinkedList<String> fMsgs;
- }
-
- public MRConsumerMock ()
- {
- fReplies = new LinkedList<> ();
- }
-
- @Override
- public void close ()
- {
- }
-
- @Override
- public void setApiCredentials ( String apiKey, String apiSecret )
- {
- }
-
- @Override
- public void clearApiCredentials ()
- {
- }
-
- public synchronized void add ( Entry e )
- {
- fReplies.add ( e );
- }
-
- public void addImmediateMsg ( String msg )
- {
- addDelayedMsg ( 0, msg );
- }
-
- public void addDelayedMsg ( long delay, String msg )
- {
- final LinkedList<String> list = new LinkedList<> ();
- list.add ( msg );
- add ( new Entry ( delay, 200, list ) );
- }
-
- public void addImmediateMsgGroup ( List<String> msgs )
- {
- addDelayedMsgGroup ( 0, msgs );
- }
-
- public void addDelayedMsgGroup ( long delay, List<String> msgs )
- {
- final LinkedList<String> list = new LinkedList<> ( msgs );
- add ( new Entry ( delay, 200, list ) );
- }
-
- public void addImmediateError ( int statusCode, String statusText )
- {
- add ( new Entry ( 0, statusCode, statusText ) );
- }
-
- @Override
- public Iterable<String> fetch () throws IOException
- {
- return fetch ( -1, -1 );
- }
-
- @Override
- public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException
- {
- return fReplies.size () > 0 ? fReplies.removeFirst ().run() : new LinkedList<String>();
- }
-
- @Override
- public void logTo ( Logger log )
- {
- }
-
- private final LinkedList<Entry> fReplies;
-
- @Override
- public MRConsumerResponse fetchWithReturnConsumerResponse() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs,
- int limit) {
- // TODO Auto-generated method stub
- return null;
- }
-}