diff options
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java | 111 |
1 files changed, 61 insertions, 50 deletions
diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java b/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java index 71bd85c..a80d5b9 100644 --- a/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java +++ b/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java @@ -52,7 +52,7 @@ import com.google.gson.internal.LinkedTreeMap; import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny; import com.sun.org.apache.xerces.internal.impl.dv.util.Base64; -public class MirrorMakerAgent {/* +public class MirrorMakerAgent { static final Logger logger = Logger.getLogger(MirrorMakerAgent.class); Properties mirrorMakerProperties = new Properties(); ListMirrorMaker mirrorMakers = null; @@ -62,6 +62,7 @@ public class MirrorMakerAgent {/* String topicname = ""; String mechid = ""; String password = ""; + public boolean exitLoop=false; private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J"; public static void main(String[] args) { @@ -110,7 +111,7 @@ public class MirrorMakerAgent {/* loadProperties(); input = null; try { - input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh"); + /*input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");*/ if(false) { throw new IOException(); } @@ -227,7 +228,7 @@ public class MirrorMakerAgent {/* // System.out.println(g.toJson(mirrorMakers)); } - private String subscribeTopic(String timeout) { + public String subscribeTopic(String timeout) { String response = ""; try { String requestURL = this.topicURL + "/events/" + this.topicname + "/mirrormakeragent/1?timeout=" + timeout @@ -260,7 +261,7 @@ public class MirrorMakerAgent {/* return null; } - private String publishTopic(String message) { + public String publishTopic(String message) { try { String requestURL = this.topicURL + "/events/" + this.topicname; String authString = this.mechid + ":" + this.password; @@ -290,7 +291,7 @@ public class MirrorMakerAgent {/* } } - private void readAgentTopic() { + public void readAgentTopic() { try { int connectionattempt = 0; while (true) { @@ -305,49 +306,7 @@ public class MirrorMakerAgent {/* // Cast the 1st item (since limit=1 and see the type of // object - if (object.get("createMirrorMaker") != null) { - logger.info("Received createMirrorMaker request from topic"); - CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class); - createMirrorMaker(m.getCreateMirrorMaker()); - checkAgentProcess(); - mirrorMakers.setMessageID(m.getMessageID()); - publishTopic(g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("updateMirrorMaker") != null) { - logger.info("Received updateMirrorMaker request from topic"); - UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class); - updateMirrorMaker(m.getUpdateMirrorMaker()); - checkAgentProcess(); - mirrorMakers.setMessageID(m.getMessageID()); - publishTopic(g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("deleteMirrorMaker") != null) { - logger.info("Received deleteMirrorMaker request from topic"); - DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class); - deleteMirrorMaker(m.getDeleteMirrorMaker()); - checkAgentProcess(); - mirrorMakers.setMessageID(m.getMessageID()); - publishTopic(g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("listAllMirrorMaker") != null) { - logger.info("Received listALLMirrorMaker request from topic"); - checkAgentProcess(); - mirrorMakers.setMessageID((String) object.get("messageID")); - publishTopic(g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("updateWhiteList") != null) { - logger.info("Received updateWhiteList request from topic"); - UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class); - updateWhiteList(m.getUpdateWhiteList()); - checkAgentProcess(); - mirrorMakers.setMessageID(m.getMessageID()); - publishTopic(g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); - } else if (object.get("listMirrorMaker") != null) { - logger.info("Received listMirrorMaker from topic, skipping messages"); - } else { - logger.info("Received unknown request from topic"); - } + readAgent(object, topicMessage); } catch (Exception ex) { connectionattempt++; if (connectionattempt > 5) { @@ -363,13 +322,65 @@ public class MirrorMakerAgent {/* connectionattempt = 0; checkAgentProcess(); } - + if(exitLoop){ + break; + } } } catch (Exception e) { logger.error("Exception at readAgentTopic : " + e); } } + + public void readAgent(LinkedTreeMap<?, ?> object,String topicMessage){ + + Gson g = new Gson(); + + if (object.get("createMirrorMaker") != null) { + logger.info("Received createMirrorMaker request from topic"); + CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class); + createMirrorMaker(m.getCreateMirrorMaker()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("updateMirrorMaker") != null) { + logger.info("Received updateMirrorMaker request from topic"); + UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class); + updateMirrorMaker(m.getUpdateMirrorMaker()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("deleteMirrorMaker") != null) { + logger.info("Received deleteMirrorMaker request from topic"); + DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class); + deleteMirrorMaker(m.getDeleteMirrorMaker()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("listAllMirrorMaker") != null) { + logger.info("Received listALLMirrorMaker request from topic"); + checkAgentProcess(); + mirrorMakers.setMessageID((String) object.get("messageID")); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("updateWhiteList") != null) { + logger.info("Received updateWhiteList request from topic"); + UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class); + updateWhiteList(m.getUpdateWhiteList()); + checkAgentProcess(); + mirrorMakers.setMessageID(m.getMessageID()); + publishTopic(g.toJson(mirrorMakers)); + mirrorMakers.setMessageID(""); + } else if (object.get("listMirrorMaker") != null) { + logger.info("Received listMirrorMaker from topic, skipping messages"); + } else { + logger.info("Received unknown request from topic"); + } + + } protected void createMirrorMaker(MirrorMaker newMirrorMaker) { boolean exists = false; @@ -599,4 +610,4 @@ public class MirrorMakerAgent {/* } } -*/} +} |