diff options
author | sunil unnava <su622b@att.com> | 2018-06-20 17:08:42 -0400 |
---|---|---|
committer | sunil unnava <su622b@att.com> | 2018-06-20 17:12:36 -0400 |
commit | 1ee517bedb7c1634e37c1124676f001b95840985 (patch) | |
tree | fbbc9c09fe0424224c22430c537c3e6126d98c03 /src/main/java | |
parent | 669941433fe0d154e11161e0a1095518a6199b06 (diff) |
changes for kafka upgrade
Issue-ID: DMAAP-513
Change-Id: I614ff2919e28c3194eab6bb731d076d9c91be1d7
Signed-off-by: sunil unnava <su622b@att.com>
Diffstat (limited to 'src/main/java')
8 files changed, 162 insertions, 80 deletions
diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java b/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java index 71bd85c..977caae 100644 --- a/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java +++ b/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java @@ -19,7 +19,6 @@ * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ - package com.att.nsa.dmaapMMAgent; import java.io.BufferedReader; @@ -35,10 +34,9 @@ import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.Properties; - -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.jasypt.util.text.BasicTextEncryptor; +//import org.json.JSONObject; +//import org.apache.log4j.Logger; +//import org.jasypt.util.text.BasicTextEncryptor; import com.att.nsa.dmaapMMAgent.dao.CreateMirrorMaker; import com.att.nsa.dmaapMMAgent.dao.DeleteMirrorMaker; @@ -48,6 +46,7 @@ import com.att.nsa.dmaapMMAgent.dao.UpdateMirrorMaker; import com.att.nsa.dmaapMMAgent.dao.UpdateWhiteList; import com.att.nsa.dmaapMMAgent.utils.MirrorMakerProcessHandler; import com.google.gson.Gson; +import com.google.gson.JsonArray; import com.google.gson.internal.LinkedTreeMap; import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny; import com.sun.org.apache.xerces.internal.impl.dv.util.Base64; @@ -62,6 +61,7 @@ public class MirrorMakerAgent {/* String topicname = ""; String mechid = ""; String password = ""; + String grepLog = ""; private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J"; public static void main(String[] args) { @@ -81,7 +81,12 @@ public class MirrorMakerAgent {/* MirrorMakerAgent agent = new MirrorMakerAgent(); if (agent.checkStartup()) { logger.info("mmagent started, loading properties"); - agent.checkAgentProcess(); + try { + agent.checkAgentProcess(); + } catch (Exception e) { + + e.printStackTrace(); + } agent.readAgentTopic(); } else { System.out.println( @@ -96,14 +101,14 @@ public class MirrorMakerAgent {/* input = new FileInputStream(mmagenthome + "/etc/mmagent.config"); logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config"); } catch (IOException ex) { - logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file" + ex); + logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file"); return false; } finally { if (input != null) { try { input.close(); } catch (IOException e) { - logger.error(" IOException occers " + e); + e.printStackTrace(); } } } @@ -111,19 +116,16 @@ public class MirrorMakerAgent {/* input = null; try { input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh"); - if(false) { - throw new IOException(); - } logger.info("kakahome is set :" + kafkahome); } catch (IOException ex) { - logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly" + ex); + logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly"); return false; } finally { if (input != null) { try { input.close(); } catch (IOException e) { - logger.error("IOException" + e); + e.printStackTrace(); } } } @@ -144,38 +146,41 @@ public class MirrorMakerAgent {/* return true; } - private void checkPropertiesFile(String agentName, String propName, String info, boolean refresh) { + private void checkPropertiesFile(MirrorMaker mm, String propName, boolean refresh) { InputStream input = null; OutputStream out = null; try { if (refresh) { throw new IOException(); } - input = new FileInputStream(mmagenthome + "/etc/" + agentName + propName + ".properties"); + input = new FileInputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties"); } catch (IOException ex) { - logger.error(" IOException will be handled " + ex); try { input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties"); Properties prop = new Properties(); prop.load(input); if (propName.equals("consumer")) { - prop.setProperty("group.id", agentName); - prop.setProperty("zookeeper.connect", info); - } else { - prop.setProperty("metadata.broker.list", info); + prop.setProperty("group.id", mm.name); + + prop.setProperty("bootstrap.servers", mm.consumer); + prop.setProperty("client.id", mm.name + "MM_consumer"); + } else { + prop.setProperty("bootstrap.servers", mm.producer); + prop.setProperty("client.id", mm.name + "MM_producer"); + } - out = new FileOutputStream(mmagenthome + "/etc/" + agentName + propName + ".properties"); + out = new FileOutputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties"); prop.store(out, ""); } catch (Exception e) { - logger.error("Exception at checkPropertiesFile " +e); + e.printStackTrace(); } } finally { if (input != null) { try { input.close(); } catch (IOException e) { - logger.error("Exception occurred is " +e); + e.printStackTrace(); } } if (out != null) { @@ -183,27 +188,26 @@ public class MirrorMakerAgent {/* out.close(); } catch (IOException e) { e.printStackTrace(); - logger.error("Exception is : "+e); } } } } - private void checkAgentProcess() { + private void checkAgentProcess() throws Exception { logger.info("Checking MirrorMaker Process"); if (mirrorMakers != null) { int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size(); for (int i = 0; i < mirrorMakersCount; i++) { MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i); - if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name) == false) { - checkPropertiesFile(mm.name, "consumer", mm.consumer, false); - checkPropertiesFile(mm.name, "producer", mm.producer, false); + if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name,mm.enablelogCheck,this.grepLog) == false) { + checkPropertiesFile(mm, "consumer", false); + checkPropertiesFile(mm, "producer", false); if (mm.whitelist != null && !mm.whitelist.equals("")) { logger.info("MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details"); MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name, mmagenthome + "/etc/" + mm.name + "consumer.properties", - mmagenthome + "/etc/" + mm.name + "producer.properties", mm.whitelist); + mmagenthome + "/etc/" + mm.name + "producer.properties",mm.numStreams, mm.whitelist); mm.setStatus("RESTARTING"); } else { @@ -213,7 +217,6 @@ public class MirrorMakerAgent {/* try { Thread.sleep(1000); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); } mirrorMakers.getListMirrorMaker().set(i, mm); } else { @@ -248,13 +251,18 @@ public class MirrorMakerAgent {/* response = response + line; } Gson g = new Gson(); + //Get message as JSON Array + JsonArray topicMessage = g.fromJson(response, JsonArray.class); + if (topicMessage.size() != 0) { + return topicMessage.get(0).toString(); + } + // get message as JSON String Array String[] topicMessage = g.fromJson(response, String[].class); if (topicMessage.length != 0) { return topicMessage[0]; } } catch (Exception e) { - logger.error(" Exception Occered " + e); return "ERROR:" + e.getMessage() + " Server Response is:" + response; } return null; @@ -285,7 +293,6 @@ public class MirrorMakerAgent {/* return response; } catch (Exception e) { - logger.error(" Exception Occered " + e); return "ERROR:" + e.getLocalizedMessage(); } } @@ -301,6 +308,12 @@ public class MirrorMakerAgent {/* LinkedTreeMap<?, ?> object = null; if (topicMessage != null) { try { + //Check and parse if String object returned by consumer API + //else use the jsonObject + if( topicMessage.startsWith("\"")) + { + topicMessage = g.fromJson(topicMessage.toString(), String.class); + } object = g.fromJson(topicMessage, LinkedTreeMap.class); // Cast the 1st item (since limit=1 and see the type of @@ -316,6 +329,11 @@ public class MirrorMakerAgent {/* } else if (object.get("updateMirrorMaker") != null) { logger.info("Received updateMirrorMaker request from topic"); UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class); + JSONObject json = new JSONObject(topicMessage); + JSONObject json2 = (JSONObject) json.get("updateMirrorMaker"); + if(!json2.has("numStreams")){ + m.getUpdateMirrorMaker().setNumStreams(0); + } updateMirrorMaker(m.getUpdateMirrorMaker()); checkAgentProcess(); mirrorMakers.setMessageID(m.getMessageID()); @@ -351,7 +369,7 @@ public class MirrorMakerAgent {/* } catch (Exception ex) { connectionattempt++; if (connectionattempt > 5) { - logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage + ex); + logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage); return; } logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt @@ -366,12 +384,12 @@ public class MirrorMakerAgent {/* } } catch (Exception e) { - logger.error("Exception at readAgentTopic : " + e); + e.printStackTrace(); } } - protected void createMirrorMaker(MirrorMaker newMirrorMaker) { + private void createMirrorMaker(MirrorMaker newMirrorMaker) { boolean exists = false; if (mirrorMakers != null) { int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size(); @@ -390,12 +408,12 @@ public class MirrorMakerAgent {/* } else if (exists == false && mirrorMakers == null) { mirrorMakers = new ListMirrorMaker(); ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker(); - list = new ArrayList(); + list = new ArrayList<MirrorMaker>(); list.add(newMirrorMaker); mirrorMakers.setListMirrorMaker(list); } - checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true); - checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true); + checkPropertiesFile(newMirrorMaker, "consumer", true); + checkPropertiesFile(newMirrorMaker, "producer", true); Gson g = new Gson(); mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers)); @@ -404,7 +422,7 @@ public class MirrorMakerAgent {/* out = new FileOutputStream(mmagenthome + "/etc/mmagent.config"); mirrorMakerProperties.store(out, ""); } catch (IOException ex) { - logger.error(" IOException Occered " + ex); + ex.printStackTrace(); } finally { if (out != null) { try { @@ -424,16 +442,30 @@ public class MirrorMakerAgent {/* MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i); if (mm.name.equals(newMirrorMaker.name)) { exists = true; - mm.setConsumer(newMirrorMaker.getConsumer()); - mm.setProducer(newMirrorMaker.getProducer()); + if(null!=newMirrorMaker.getConsumer()) + { + mm.setConsumer(newMirrorMaker.getConsumer()); + } + if(null!=newMirrorMaker.getProducer()) + { + mm.setProducer(newMirrorMaker.getProducer()); + } + if(newMirrorMaker.getNumStreams()>=1) + { + mm.setNumStreams(newMirrorMaker.getNumStreams()); + } + + mm.setEnablelogCheck(newMirrorMaker.enablelogCheck); + mirrorMakers.getListMirrorMaker().set(i, mm); + newMirrorMaker=mm; logger.info("Updating MirrorMaker:" + newMirrorMaker.name); } } } if (exists) { - checkPropertiesFile(newMirrorMaker.name, "consumer", newMirrorMaker.consumer, true); - checkPropertiesFile(newMirrorMaker.name, "producer", newMirrorMaker.producer, true); + checkPropertiesFile(newMirrorMaker, "consumer", true); + checkPropertiesFile(newMirrorMaker, "producer", true); Gson g = new Gson(); mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers)); @@ -445,17 +477,15 @@ public class MirrorMakerAgent {/* try { Thread.sleep(1000); } catch (InterruptedException e) { - logger.log(Level.WARN, "Interrupted!", e); - Thread.currentThread().interrupt(); } } catch (IOException ex) { - logger.error(" IOException Occered " + ex); + ex.printStackTrace(); } finally { if (out != null) { try { out.close(); } catch (IOException e) { - logger.error(" IOException Occered " + e); + e.printStackTrace(); } } } @@ -490,18 +520,15 @@ public class MirrorMakerAgent {/* try { Thread.sleep(1000); } catch (InterruptedException e) { - logger.log(Level.WARN, "Interrupted!", e); - Thread.currentThread().interrupt(); } } catch (IOException ex) { - logger.error("Exception at updateWhiteList : " + ex); + ex.printStackTrace(); } finally { if (out != null) { try { out.close(); } catch (IOException e) { e.printStackTrace(); - logger.error("IOException occered " + e); } } } @@ -581,11 +608,11 @@ public class MirrorMakerAgent {/* this.topicURL = mirrorMakerProperties.getProperty("topicURL"); this.topicname = mirrorMakerProperties.getProperty("topicname"); this.mechid = mirrorMakerProperties.getProperty("mechid"); + this.grepLog= mirrorMakerProperties.getProperty("grepLog"); BasicTextEncryptor textEncryptor = new BasicTextEncryptor(); textEncryptor.setPassword(secret); - //this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password")); - this.password = mirrorMakerProperties.getProperty("password"); + this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password")); } catch (IOException ex) { // ex.printStackTrace(); } finally { diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/dao/CreateMirrorMaker.java b/src/main/java/com/att/nsa/dmaapMMAgent/dao/CreateMirrorMaker.java index 7094ba4..234f0f0 100644 --- a/src/main/java/com/att/nsa/dmaapMMAgent/dao/CreateMirrorMaker.java +++ b/src/main/java/com/att/nsa/dmaapMMAgent/dao/CreateMirrorMaker.java @@ -19,7 +19,6 @@ * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ - package com.att.nsa.dmaapMMAgent.dao; public class CreateMirrorMaker { diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/dao/DeleteMirrorMaker.java b/src/main/java/com/att/nsa/dmaapMMAgent/dao/DeleteMirrorMaker.java index 68ef2e2..92bf678 100644 --- a/src/main/java/com/att/nsa/dmaapMMAgent/dao/DeleteMirrorMaker.java +++ b/src/main/java/com/att/nsa/dmaapMMAgent/dao/DeleteMirrorMaker.java @@ -19,7 +19,6 @@ * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ - package com.att.nsa.dmaapMMAgent.dao; public class DeleteMirrorMaker { diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/dao/ListMirrorMaker.java b/src/main/java/com/att/nsa/dmaapMMAgent/dao/ListMirrorMaker.java index 7ca1658..f655139 100644 --- a/src/main/java/com/att/nsa/dmaapMMAgent/dao/ListMirrorMaker.java +++ b/src/main/java/com/att/nsa/dmaapMMAgent/dao/ListMirrorMaker.java @@ -19,7 +19,6 @@ * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ - package com.att.nsa.dmaapMMAgent.dao; import java.util.ArrayList; diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/dao/MirrorMaker.java b/src/main/java/com/att/nsa/dmaapMMAgent/dao/MirrorMaker.java index 61426c9..cdf6584 100644 --- a/src/main/java/com/att/nsa/dmaapMMAgent/dao/MirrorMaker.java +++ b/src/main/java/com/att/nsa/dmaapMMAgent/dao/MirrorMaker.java @@ -19,7 +19,6 @@ * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ - package com.att.nsa.dmaapMMAgent.dao; public class MirrorMaker { @@ -28,6 +27,8 @@ public class MirrorMaker { public String producer; public String whitelist; public String status; + public int numStreams=1; + public boolean enablelogCheck = false; public String getStatus() { return status; @@ -69,4 +70,20 @@ public class MirrorMaker { this.whitelist = whitelist; } + public int getNumStreams() { + return numStreams; + } + + public void setNumStreams(int numStreams) { + this.numStreams = numStreams; + } + + public boolean isEnablelogCheck() { + return enablelogCheck; + } + + public void setEnablelogCheck(boolean enablelogCheck) { + this.enablelogCheck = enablelogCheck; + } + }
\ No newline at end of file diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/dao/UpdateMirrorMaker.java b/src/main/java/com/att/nsa/dmaapMMAgent/dao/UpdateMirrorMaker.java index 336d240..fdb8d7f 100644 --- a/src/main/java/com/att/nsa/dmaapMMAgent/dao/UpdateMirrorMaker.java +++ b/src/main/java/com/att/nsa/dmaapMMAgent/dao/UpdateMirrorMaker.java @@ -19,7 +19,6 @@ * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ - package com.att.nsa.dmaapMMAgent.dao; public class UpdateMirrorMaker { diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/dao/UpdateWhiteList.java b/src/main/java/com/att/nsa/dmaapMMAgent/dao/UpdateWhiteList.java index 3227c51..9b9de83 100644 --- a/src/main/java/com/att/nsa/dmaapMMAgent/dao/UpdateWhiteList.java +++ b/src/main/java/com/att/nsa/dmaapMMAgent/dao/UpdateWhiteList.java @@ -19,7 +19,6 @@ * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ - package com.att.nsa.dmaapMMAgent.dao; public class UpdateWhiteList { diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/utils/MirrorMakerProcessHandler.java b/src/main/java/com/att/nsa/dmaapMMAgent/utils/MirrorMakerProcessHandler.java index dd1442e..e4a0b97 100644 --- a/src/main/java/com/att/nsa/dmaapMMAgent/utils/MirrorMakerProcessHandler.java +++ b/src/main/java/com/att/nsa/dmaapMMAgent/utils/MirrorMakerProcessHandler.java @@ -19,7 +19,6 @@ * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ - package com.att.nsa.dmaapMMAgent.utils; import java.io.BufferedReader; @@ -28,13 +27,12 @@ import java.io.InputStreamReader; import org.apache.log4j.Logger; -import com.att.nsa.dmaapMMAgent.MirrorMakerAgent; - - -public class MirrorMakerProcessHandler { +public class MirrorMakerProcessHandler {/* static final Logger logger = Logger.getLogger(MirrorMakerProcessHandler.class); + static String mmagenthome = System.getProperty("MMAGENTHOME"); - public static boolean checkMirrorMakerProcess(String agentname) { + public static boolean checkMirrorMakerProcess(String agentname, boolean enablelogCheck, String grepLog) throws Exception { + String line,linelog; try { Runtime rt = Runtime.getRuntime(); Process mmprocess = null; @@ -45,22 +43,61 @@ public class MirrorMakerProcessHandler { + "~%' and caption='java.exe'\""; mmprocess = rt.exec(args); } else { - String args[] = { "/bin/sh", "-c", "ps -ef |grep java |grep agentname=" + agentname + "~" }; + //String args[] = { "/bin/sh", "-c", "ps -ef |grep java |grep agentname=" + agentname + "~" }; + + String args[] = { "/bin/sh", "-c", "ps -ef | grep `ps -ef |grep agentname=" + agentname + "~ | egrep -v 'grep|java' | awk '{print $2}' `| egrep -v '/bin/sh|grep' "}; + logger.info("CheckMM process->"+args[2]); mmprocess = rt.exec(args); } InputStream is = mmprocess.getInputStream(); InputStreamReader isr = new InputStreamReader(is); BufferedReader br = new BufferedReader(isr); - String line; + while ((line = br.readLine()) != null) { - // System.out.println(line); - if (line.contains("agentname=" + agentname) && line.contains("/bin/sh -c") == false) { - return true; - } + System.out.println(line); + // if (line.contains("agentname=" + agentname) && line.contains("/bin/sh -c") == false) { + + //If enablelogCheck Check MirrorMaker log for errors and restart mirrormaker + if(enablelogCheck) { + logger.info("Check if MM log contains any errors"); + String args2[]; + args2 = new String[] { "/bin/sh", "-c", "grep -i ERROR "+ mmagenthome + "/logs/" + agentname + "_MMaker.log"}; + if(null!=grepLog && !grepLog.isEmpty()) + { + args2 = new String[]{ "/bin/sh", "-c", grepLog +" " + mmagenthome + "/logs/" + agentname + "_MMaker.log"}; + } + logger.info("Grep log args-- "+args2[2]); + mmprocess = rt.exec(args2); + InputStream islog = mmprocess.getInputStream(); + InputStreamReader isrlog = new InputStreamReader(islog); + BufferedReader brlog = new BufferedReader(isrlog); + + while ((linelog = brlog.readLine()) != null) + { + logger.info("Error from MM log--"+linelog); + + if (linelog.toLowerCase().contains("ERROR".toLowerCase()) || + linelog.toLowerCase().contains("Issue".toLowerCase()) ) + { + logger.info("MM log contains error Stop MM and restart"); + stopMirrorMaker(agentname); + isrlog.close(); + brlog.close(); + return false; + } + + + } + isrlog.close(); + brlog.close(); + } + + return true; + // } } } catch (Exception e) { - logger.error("Error at checkMirrorMakerProcess method:" + e.getMessage()); + e.printStackTrace(); } return false; } @@ -75,8 +112,14 @@ public class MirrorMakerProcessHandler { + "~%' and caption='java.exe'\" call terminate"; killprocess = rt.exec(args); } else { + //String args[] = { "/bin/sh", "-c", + // "kill -9 $(ps -ef |grep java |grep agentname=" + agentname + "~| awk '{print $2}')" }; + + //String args[] = { "/bin/sh", "-c", + // "kill -9 `ps -ef |grep agentname=" + agentname + "~| egrep -v 'grep|java' | awk '{print $2}'` | egrep -v '/bin/sh|grep'"}; String args[] = { "/bin/sh", "-c", - "kill -9 $(ps -ef |grep java |grep agentname=" + agentname + "~| awk '{print $2}')" }; + "for i in `ps -ef |grep agentname="+ agentname + "~ | egrep -v 'grep|java' | awk '{print $2}'`;do kill -9 `ps -eaf | grep $i | egrep -v '/bin/sh|grep' | awk '{print $2}'` ;done"}; + logger.info ("Stop MM ->"+args[2]); // args = "kill $(ps -ef |grep java |grep agentname=" + // agentname + "~| awk '{print $2}')"; killprocess = rt.exec(args); @@ -92,20 +135,20 @@ public class MirrorMakerProcessHandler { logger.info("Mirror Maker " + agentname + " Stopped"); } catch (Exception e) { - logger.error("Error at stopMirrorMaker method:" + e.getMessage()); + e.printStackTrace(); } } public static void startMirrorMaker(String mmagenthome, String kafkaHome, String agentName, String consumerConfig, - String producerConfig, String whitelist) { + String producerConfig, int numStreams, String whitelist) { try { Runtime rt = Runtime.getRuntime(); if (System.getProperty("os.name").contains("Windows")) { String args = kafkaHome + "/bin/windows/kafka-run-class.bat -Dagentname=" + agentName + "~ kafka.tools.MirrorMaker --consumer.config " + consumerConfig + " --producer.config " - + producerConfig + " --whitelist '" + whitelist + "' > " + mmagenthome + "/logs/" + agentName + + producerConfig +" --num.streams " + numStreams + " --abort.on.send.failure true" +" --whitelist '" + whitelist + "' > " + mmagenthome + "/logs/" + agentName + "_MMaker.log"; final Process process = rt.exec(args); new Thread() { @@ -119,7 +162,7 @@ public class MirrorMakerProcessHandler { // System.out.println(line); } } catch (Exception anExc) { - logger.error("Error at startMirrorMaker method:" + anExc.getMessage()); + anExc.printStackTrace(); } } }.start(); @@ -127,7 +170,7 @@ public class MirrorMakerProcessHandler { String args[] = { "/bin/sh", "-c", kafkaHome + "/bin/kafka-run-class.sh -Dagentname=" + agentName + "~ kafka.tools.MirrorMaker --consumer.config " + consumerConfig - + " --producer.config " + producerConfig + " --whitelist '" + whitelist + "' >" + + " --producer.config " + producerConfig + " --num.streams " + numStreams + " --abort.on.send.failure true" + " --whitelist '" + whitelist + "' >" + mmagenthome + "/logs/" + agentName + "_MMaker.log 2>&1" }; final Process process = rt.exec(args); new Thread() { @@ -141,7 +184,7 @@ public class MirrorMakerProcessHandler { // System.out.println(line); } } catch (Exception anExc) { - logger.error("Exception at startMirrorMaker method else part run method:" + anExc.getMessage()); + anExc.printStackTrace(); } } }.start(); @@ -153,4 +196,4 @@ public class MirrorMakerProcessHandler { e.printStackTrace(); } } -} +*/} |