From ca63da6e0cb7fb63e231343d0b52a40036f6b6aa Mon Sep 17 00:00:00 2001 From: Varun Gudisena Date: Wed, 30 Aug 2017 16:00:10 -0500 Subject: Add Initial Code Import Added initial code for DMaaP Message Router Component Issue-id: DMAAP-76 Change-Id: Ica6f265ea4f2901cf47191f21b4448514ea7c8d4 Signed-off-by: Varun Gudisena --- .../att/sa/cambria/testClient/SimpleExample.java | 335 +++++++++++++++++++++ 1 file changed, 335 insertions(+) create mode 100644 src/test/java/com/att/sa/cambria/testClient/SimpleExample.java (limited to 'src/test/java/com/att/sa/cambria/testClient/SimpleExample.java') diff --git a/src/test/java/com/att/sa/cambria/testClient/SimpleExample.java b/src/test/java/com/att/sa/cambria/testClient/SimpleExample.java new file mode 100644 index 0000000..a6ad73e --- /dev/null +++ b/src/test/java/com/att/sa/cambria/testClient/SimpleExample.java @@ -0,0 +1,335 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.sa.cambria.testClient; + +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.cluster.Broker; +import kafka.common.ErrorMapping; +import kafka.common.TopicAndPartition; +import kafka.javaapi.*; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.message.MessageAndOffset; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class SimpleExample +{ +// public static void main ( String args[] ) +// { +// if ( args.length < 5 ) +// { +// System.err.println ( "usage: SimpleExample " ); +// return; +// } +// +// final long maxReads = Long.parseLong ( args[0] ); +// final String topic = args[1]; +// final int partition = Integer.parseInt ( args[2] ); +// +// final int port = Integer.parseInt ( args[4] ); +// final hostPort hp = new hostPort ( args[3], port ); +// final LinkedList seeds = new LinkedList (); +// seeds.add ( hp ); +// +// try +// { +// final SimpleExample example = new SimpleExample (); +// example.run ( maxReads, topic, partition, seeds ); +// } +// catch ( Exception e ) +// { +// System.out.println ( "Oops:" + e ); +// e.printStackTrace (); +// } +// } +// +// public SimpleExample () +// { +// fReplicaBrokers = new ArrayList (); +// } +// +// public void run ( long remainingAllowedReads, String a_topic, int a_partition, List seedHosts ) throws IOException +// { +// // find the meta data about the topic and partition we are interested in +// +// hostPort leadBroker = findLeader ( seedHosts, a_topic, a_partition ); +// if ( leadBroker == null ) +// { +// System.out.println ( "Can't find leader for Topic and Partition. Exiting" ); +// return; +// } +// +// final String clientName = "Client_" + a_topic + "_" + a_partition; +// +// SimpleConsumer consumer = new SimpleConsumer ( leadBroker.fHost, leadBroker.fPort, 100000, 64 * 1024, clientName ); +// long readOffset = getLastOffset ( consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime (), clientName ); +// +// int numErrors = 0; +// while ( remainingAllowedReads > 0 ) +// { +// if ( consumer == null ) +// { +// consumer = new SimpleConsumer ( leadBroker.fHost, leadBroker.fPort, 100000, 64 * 1024, clientName ); +// } +// +// final FetchRequest req = new FetchRequestBuilder () +// .clientId ( clientName ) +// .addFetch ( a_topic, a_partition, readOffset, 100000 ).build (); +// final FetchResponse fetchResponse = consumer.fetch ( req ); +// +// if ( fetchResponse.hasError () ) +// { +// numErrors++; +// +// // Something went wrong! +// final short code = fetchResponse.errorCode ( a_topic, a_partition ); +// System.out.println ( "Error fetching data from the Broker:" + leadBroker + " Reason: " + code ); +// if ( numErrors > 5 ) +// break; +// +// if ( code == ErrorMapping.OffsetOutOfRangeCode () ) +// { +// // We asked for an invalid offset. For simple case ask for +// // the last element to reset +// readOffset = getLastOffset ( consumer, a_topic, +// a_partition, kafka.api.OffsetRequest.LatestTime (), +// clientName ); +// continue; +// } +// +// consumer.close (); +// consumer = null; +// +// leadBroker = findNewLeader ( leadBroker, a_topic, a_partition ); +// continue; +// } +// numErrors = 0; +// +// long numRead = 0; +// for ( MessageAndOffset messageAndOffset : fetchResponse.messageSet ( a_topic, a_partition ) ) +// { +// long currentOffset = messageAndOffset.offset (); +// if ( currentOffset < readOffset ) +// { +// System.out.println ( "Found an old offset: " +// + currentOffset + " Expecting: " + readOffset ); +// continue; +// } +// readOffset = messageAndOffset.nextOffset (); +// ByteBuffer payload = messageAndOffset.message ().payload (); +// +// byte[] bytes = new byte [payload.limit ()]; +// payload.get ( bytes ); +// System.out.println ( String.valueOf ( messageAndOffset.offset () ) + ": " + new String ( bytes, "UTF-8" ) ); +// numRead++; +// remainingAllowedReads--; +// } +// +// if ( numRead == 0 ) +// { +// try +// { +// Thread.sleep ( 1000 ); +// } +// catch ( InterruptedException ie ) +// { +// } +// } +// } +// +// if ( consumer != null ) +// { +// consumer.close (); +// } +// } +// +// public static long getLastOffset ( SimpleConsumer consumer, String topic, +// int partition, long whichTime, String clientName ) +// { +// TopicAndPartition topicAndPartition = new TopicAndPartition ( topic, +// partition ); +// Map requestInfo = new HashMap (); +// requestInfo.put ( topicAndPartition, new PartitionOffsetRequestInfo ( +// whichTime, 1 ) ); +// kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest ( +// requestInfo, kafka.api.OffsetRequest.CurrentVersion (), clientName ); +// OffsetResponse response = consumer.getOffsetsBefore ( request ); +// +// if ( response.hasError () ) +// { +// System.out.println ( "Error fetching data Offset Data the Broker. Reason: " +// + response.errorCode ( topic, partition ) ); +// return 0; +// } +// +// final long[] offsets = response.offsets ( topic, partition ); +// return offsets[0]; +// } +// +// /** +// * Find a new leader for a topic/partition, including a pause for the coordinator to +// * find a new leader, as needed. +// * +// * @param oldLeader +// * @param topic +// * @param partition +// * @return +// * @throws IOException +// */ +// private hostPort findNewLeader ( hostPort oldLeader, String topic, int partition ) throws IOException +// { +// try +// { +// int attemptsLeft = 3; +// boolean haveSlept = false; +// +// while ( attemptsLeft-- > 0 ) +// { +// System.out.println ( "" + attemptsLeft + " attempts Left" ); // FIXME: make sure it's 3 attempts! +// +// // ask the brokers for a leader +// final hostPort newLeader = findLeader ( fReplicaBrokers, topic, partition ); +// if ( newLeader != null ) +// { +// // we can use this leader if it's different (i.e. a new leader has been elected) +// // or it's the same leader, but we waited to allow ZK to get a new one, and +// // the original recovered +// if ( !oldLeader.equals ( newLeader ) || haveSlept ) +// { +// return newLeader; +// } +// } +// +// // sleep +// haveSlept = true; +// Thread.sleep ( 1000 ); +// } +// } +// catch ( InterruptedException x ) +// { +// // just give up +// } +// +// System.out.println ( "Unable to find new leader after Broker failure. Exiting" ); +// throw new IOException ( "Unable to find new leader after Broker failure. Exiting" ); +// } +// +// /** +// * Given one or more seed brokers, find the leader for a given topic/partition +// * @param seeds +// * @param topic +// * @param partition +// * @return partition metadata, or null +// */ +// private hostPort findLeader ( List seeds, String topic, int partition ) +// { +// final List topics = new ArrayList (); +// topics.add ( topic ); +// +// for ( hostPort seed : seeds ) +// { +// final SimpleConsumer consumer = new SimpleConsumer ( seed.fHost, seed.fPort, 100000, 64 * 1024, "leaderLookup" ); +// final TopicMetadataRequest req = new TopicMetadataRequest ( topics ); +// final TopicMetadataResponse resp = consumer.send ( req ); +// consumer.close (); +// +// final List metaData = resp.topicsMetadata (); +// for ( TopicMetadata item : metaData ) +// { +// for ( PartitionMetadata part : item.partitionsMetadata () ) +// { +// if ( part.partitionId () == partition ) +// { +// // found our partition. load the details, then return it +// fReplicaBrokers.clear (); +// for ( kafka.cluster.Broker replica : part.replicas () ) +// { +// fReplicaBrokers.add ( new hostPort ( replica.host (), replica.port () ) ); +// } +// return new hostPort ( part.leader () ); +// } +// } +// } +// } +// +// return null; +// } +// +// private static class hostPort +// { +// public hostPort ( String host, int port ) { fHost = host; fPort = port; } +// +// public hostPort ( Broker leader ) +// { +// fHost = leader.host (); +// fPort = leader.port (); +// } +// +// +// public final String fHost; +// public final int fPort; +// +// @Override +// public int hashCode () +// { +// final int prime = 31; +// int result = 1; +// result = prime * result +// + ( ( fHost == null ) ? 0 : fHost.hashCode () ); +// result = prime * result + fPort; +// return result; +// } +// +// @Override +// public boolean equals ( Object obj ) +// { +// if ( this == obj ) +// return true; +// if ( obj == null ) +// return false; +// if ( getClass () != obj.getClass () ) +// return false; +// hostPort other = (hostPort) obj; +// if ( fHost == null ) +// { +// if ( other.fHost != null ) +// return false; +// } +// else if ( !fHost.equals ( other.fHost ) ) +// return false; +// if ( fPort != other.fPort ) +// return false; +// return true; +// } +// } +// +// private List fReplicaBrokers; +} -- cgit 1.2.3-korg