From 3504265229c589ecc166e3ad4c33bb198b11e4ce Mon Sep 17 00:00:00 2001 From: sunil unnava Date: Tue, 23 Oct 2018 12:18:59 -0400 Subject: update the package name Issue-ID: DMAAP-858 Change-Id: I49ae6eb9c51a261b64b911e607fcbbca46c5423c Signed-off-by: sunil unnava --- .../att/sa/cambria/testClient/SimpleExample.java | 335 --------------------- 1 file changed, 335 deletions(-) delete mode 100644 src/test/java/com/att/sa/cambria/testClient/SimpleExample.java (limited to 'src/test/java/com/att/sa/cambria/testClient/SimpleExample.java') diff --git a/src/test/java/com/att/sa/cambria/testClient/SimpleExample.java b/src/test/java/com/att/sa/cambria/testClient/SimpleExample.java deleted file mode 100644 index a6ad73e..0000000 --- a/src/test/java/com/att/sa/cambria/testClient/SimpleExample.java +++ /dev/null @@ -1,335 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ - -package com.att.sa.cambria.testClient; - -import kafka.api.FetchRequest; -import kafka.api.FetchRequestBuilder; -import kafka.api.PartitionOffsetRequestInfo; -import kafka.cluster.Broker; -import kafka.common.ErrorMapping; -import kafka.common.TopicAndPartition; -import kafka.javaapi.*; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.message.MessageAndOffset; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class SimpleExample -{ -// public static void main ( String args[] ) -// { -// if ( args.length < 5 ) -// { -// System.err.println ( "usage: SimpleExample " ); -// return; -// } -// -// final long maxReads = Long.parseLong ( args[0] ); -// final String topic = args[1]; -// final int partition = Integer.parseInt ( args[2] ); -// -// final int port = Integer.parseInt ( args[4] ); -// final hostPort hp = new hostPort ( args[3], port ); -// final LinkedList seeds = new LinkedList (); -// seeds.add ( hp ); -// -// try -// { -// final SimpleExample example = new SimpleExample (); -// example.run ( maxReads, topic, partition, seeds ); -// } -// catch ( Exception e ) -// { -// System.out.println ( "Oops:" + e ); -// e.printStackTrace (); -// } -// } -// -// public SimpleExample () -// { -// fReplicaBrokers = new ArrayList (); -// } -// -// public void run ( long remainingAllowedReads, String a_topic, int a_partition, List seedHosts ) throws IOException -// { -// // find the meta data about the topic and partition we are interested in -// -// hostPort leadBroker = findLeader ( seedHosts, a_topic, a_partition ); -// if ( leadBroker == null ) -// { -// System.out.println ( "Can't find leader for Topic and Partition. Exiting" ); -// return; -// } -// -// final String clientName = "Client_" + a_topic + "_" + a_partition; -// -// SimpleConsumer consumer = new SimpleConsumer ( leadBroker.fHost, leadBroker.fPort, 100000, 64 * 1024, clientName ); -// long readOffset = getLastOffset ( consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime (), clientName ); -// -// int numErrors = 0; -// while ( remainingAllowedReads > 0 ) -// { -// if ( consumer == null ) -// { -// consumer = new SimpleConsumer ( leadBroker.fHost, leadBroker.fPort, 100000, 64 * 1024, clientName ); -// } -// -// final FetchRequest req = new FetchRequestBuilder () -// .clientId ( clientName ) -// .addFetch ( a_topic, a_partition, readOffset, 100000 ).build (); -// final FetchResponse fetchResponse = consumer.fetch ( req ); -// -// if ( fetchResponse.hasError () ) -// { -// numErrors++; -// -// // Something went wrong! -// final short code = fetchResponse.errorCode ( a_topic, a_partition ); -// System.out.println ( "Error fetching data from the Broker:" + leadBroker + " Reason: " + code ); -// if ( numErrors > 5 ) -// break; -// -// if ( code == ErrorMapping.OffsetOutOfRangeCode () ) -// { -// // We asked for an invalid offset. For simple case ask for -// // the last element to reset -// readOffset = getLastOffset ( consumer, a_topic, -// a_partition, kafka.api.OffsetRequest.LatestTime (), -// clientName ); -// continue; -// } -// -// consumer.close (); -// consumer = null; -// -// leadBroker = findNewLeader ( leadBroker, a_topic, a_partition ); -// continue; -// } -// numErrors = 0; -// -// long numRead = 0; -// for ( MessageAndOffset messageAndOffset : fetchResponse.messageSet ( a_topic, a_partition ) ) -// { -// long currentOffset = messageAndOffset.offset (); -// if ( currentOffset < readOffset ) -// { -// System.out.println ( "Found an old offset: " -// + currentOffset + " Expecting: " + readOffset ); -// continue; -// } -// readOffset = messageAndOffset.nextOffset (); -// ByteBuffer payload = messageAndOffset.message ().payload (); -// -// byte[] bytes = new byte [payload.limit ()]; -// payload.get ( bytes ); -// System.out.println ( String.valueOf ( messageAndOffset.offset () ) + ": " + new String ( bytes, "UTF-8" ) ); -// numRead++; -// remainingAllowedReads--; -// } -// -// if ( numRead == 0 ) -// { -// try -// { -// Thread.sleep ( 1000 ); -// } -// catch ( InterruptedException ie ) -// { -// } -// } -// } -// -// if ( consumer != null ) -// { -// consumer.close (); -// } -// } -// -// public static long getLastOffset ( SimpleConsumer consumer, String topic, -// int partition, long whichTime, String clientName ) -// { -// TopicAndPartition topicAndPartition = new TopicAndPartition ( topic, -// partition ); -// Map requestInfo = new HashMap (); -// requestInfo.put ( topicAndPartition, new PartitionOffsetRequestInfo ( -// whichTime, 1 ) ); -// kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest ( -// requestInfo, kafka.api.OffsetRequest.CurrentVersion (), clientName ); -// OffsetResponse response = consumer.getOffsetsBefore ( request ); -// -// if ( response.hasError () ) -// { -// System.out.println ( "Error fetching data Offset Data the Broker. Reason: " -// + response.errorCode ( topic, partition ) ); -// return 0; -// } -// -// final long[] offsets = response.offsets ( topic, partition ); -// return offsets[0]; -// } -// -// /** -// * Find a new leader for a topic/partition, including a pause for the coordinator to -// * find a new leader, as needed. -// * -// * @param oldLeader -// * @param topic -// * @param partition -// * @return -// * @throws IOException -// */ -// private hostPort findNewLeader ( hostPort oldLeader, String topic, int partition ) throws IOException -// { -// try -// { -// int attemptsLeft = 3; -// boolean haveSlept = false; -// -// while ( attemptsLeft-- > 0 ) -// { -// System.out.println ( "" + attemptsLeft + " attempts Left" ); // FIXME: make sure it's 3 attempts! -// -// // ask the brokers for a leader -// final hostPort newLeader = findLeader ( fReplicaBrokers, topic, partition ); -// if ( newLeader != null ) -// { -// // we can use this leader if it's different (i.e. a new leader has been elected) -// // or it's the same leader, but we waited to allow ZK to get a new one, and -// // the original recovered -// if ( !oldLeader.equals ( newLeader ) || haveSlept ) -// { -// return newLeader; -// } -// } -// -// // sleep -// haveSlept = true; -// Thread.sleep ( 1000 ); -// } -// } -// catch ( InterruptedException x ) -// { -// // just give up -// } -// -// System.out.println ( "Unable to find new leader after Broker failure. Exiting" ); -// throw new IOException ( "Unable to find new leader after Broker failure. Exiting" ); -// } -// -// /** -// * Given one or more seed brokers, find the leader for a given topic/partition -// * @param seeds -// * @param topic -// * @param partition -// * @return partition metadata, or null -// */ -// private hostPort findLeader ( List seeds, String topic, int partition ) -// { -// final List topics = new ArrayList (); -// topics.add ( topic ); -// -// for ( hostPort seed : seeds ) -// { -// final SimpleConsumer consumer = new SimpleConsumer ( seed.fHost, seed.fPort, 100000, 64 * 1024, "leaderLookup" ); -// final TopicMetadataRequest req = new TopicMetadataRequest ( topics ); -// final TopicMetadataResponse resp = consumer.send ( req ); -// consumer.close (); -// -// final List metaData = resp.topicsMetadata (); -// for ( TopicMetadata item : metaData ) -// { -// for ( PartitionMetadata part : item.partitionsMetadata () ) -// { -// if ( part.partitionId () == partition ) -// { -// // found our partition. load the details, then return it -// fReplicaBrokers.clear (); -// for ( kafka.cluster.Broker replica : part.replicas () ) -// { -// fReplicaBrokers.add ( new hostPort ( replica.host (), replica.port () ) ); -// } -// return new hostPort ( part.leader () ); -// } -// } -// } -// } -// -// return null; -// } -// -// private static class hostPort -// { -// public hostPort ( String host, int port ) { fHost = host; fPort = port; } -// -// public hostPort ( Broker leader ) -// { -// fHost = leader.host (); -// fPort = leader.port (); -// } -// -// -// public final String fHost; -// public final int fPort; -// -// @Override -// public int hashCode () -// { -// final int prime = 31; -// int result = 1; -// result = prime * result -// + ( ( fHost == null ) ? 0 : fHost.hashCode () ); -// result = prime * result + fPort; -// return result; -// } -// -// @Override -// public boolean equals ( Object obj ) -// { -// if ( this == obj ) -// return true; -// if ( obj == null ) -// return false; -// if ( getClass () != obj.getClass () ) -// return false; -// hostPort other = (hostPort) obj; -// if ( fHost == null ) -// { -// if ( other.fHost != null ) -// return false; -// } -// else if ( !fHost.equals ( other.fHost ) ) -// return false; -// if ( fPort != other.fPort ) -// return false; -// return true; -// } -// } -// -// private List fReplicaBrokers; -} -- cgit 1.2.3-korg