diff options
Diffstat (limited to 'src')
3 files changed, 143 insertions, 115 deletions
diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java b/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java index a80d5b9..3f993db 100644 --- a/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java +++ b/src/main/java/com/att/nsa/dmaapMMAgent/MirrorMakerAgent.java @@ -22,17 +22,12 @@ package com.att.nsa.dmaapMMAgent; -import java.io.BufferedReader; -import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.URL; import java.util.ArrayList; import java.util.Properties; @@ -49,8 +44,6 @@ import com.att.nsa.dmaapMMAgent.dao.UpdateWhiteList; import com.att.nsa.dmaapMMAgent.utils.MirrorMakerProcessHandler; import com.google.gson.Gson; import com.google.gson.internal.LinkedTreeMap; -import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny; -import com.sun.org.apache.xerces.internal.impl.dv.util.Base64; public class MirrorMakerAgent { static final Logger logger = Logger.getLogger(MirrorMakerAgent.class); @@ -62,7 +55,8 @@ public class MirrorMakerAgent { String topicname = ""; String mechid = ""; String password = ""; - public boolean exitLoop=false; + TopicUtil topicUtil = new TopicUtil(); + public boolean exitLoop = false; private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J"; public static void main(String[] args) { @@ -97,7 +91,8 @@ public class MirrorMakerAgent { input = new FileInputStream(mmagenthome + "/etc/mmagent.config"); logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config"); } catch (IOException ex) { - logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file" + ex); + logger.error( + mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file" + ex); return false; } finally { if (input != null) { @@ -111,8 +106,11 @@ public class MirrorMakerAgent { loadProperties(); input = null; try { - /*input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");*/ - if(false) { + /* + * input = new FileInputStream(kafkahome + + * "/bin/kafka-run-class.sh"); + */ + if (false) { throw new IOException(); } logger.info("kakahome is set :" + kafkahome); @@ -128,14 +126,14 @@ public class MirrorMakerAgent { } } } - String response = publishTopic("{\"test\":\"test\"}"); + String response = topicUtil.publishTopic(topicURL, topicname, mechid, password, "{\"test\":\"test\"}"); if (response.startsWith("ERROR:")) { logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:" + this.topicURL + " Error is: " + response); return false; } logger.info("Published to Topic :" + this.topicname + " Successfully"); - response = subscribeTopic("1"); + response = topicUtil.subscribeTopic(topicURL, topicname, "1", response, response); if (response != null && response.startsWith("ERROR:")) { logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:" + this.topicURL + " Error is: " + response); @@ -169,14 +167,14 @@ public class MirrorMakerAgent { prop.store(out, ""); } catch (Exception e) { - logger.error("Exception at checkPropertiesFile " +e); + logger.error("Exception at checkPropertiesFile " + e); } } finally { if (input != null) { try { input.close(); } catch (IOException e) { - logger.error("Exception occurred is " +e); + logger.error("Exception occurred is " + e); } } if (out != null) { @@ -184,7 +182,7 @@ public class MirrorMakerAgent { out.close(); } catch (IOException e) { e.printStackTrace(); - logger.error("Exception is : "+e); + logger.error("Exception is : " + e); } } } @@ -201,7 +199,8 @@ public class MirrorMakerAgent { checkPropertiesFile(mm.name, "producer", mm.producer, false); if (mm.whitelist != null && !mm.whitelist.equals("")) { - logger.info("MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details"); + logger.info( + "MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details"); MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name, mmagenthome + "/etc/" + mm.name + "consumer.properties", mmagenthome + "/etc/" + mm.name + "producer.properties", mm.whitelist); @@ -214,7 +213,7 @@ public class MirrorMakerAgent { try { Thread.sleep(1000); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + Thread.currentThread().interrupt(); } mirrorMakers.getListMirrorMaker().set(i, mm); } else { @@ -228,76 +227,13 @@ public class MirrorMakerAgent { // System.out.println(g.toJson(mirrorMakers)); } - public String subscribeTopic(String timeout) { - String response = ""; - try { - String requestURL = this.topicURL + "/events/" + this.topicname + "/mirrormakeragent/1?timeout=" + timeout - + "&limit=1"; - String authString = this.mechid + ":" + this.password; - String authStringEnc = Base64.encode(authString.getBytes()); - URL url = new URL(requestURL); - HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("GET"); - connection.setDoOutput(true); - connection.setRequestProperty("Authorization", "Basic " + authStringEnc); - connection.setRequestProperty("Content-Type", "application/json"); - InputStream content = (InputStream) connection.getInputStream(); - BufferedReader in = new BufferedReader(new InputStreamReader(content)); - String line; - - while ((line = in.readLine()) != null) { - response = response + line; - } - Gson g = new Gson(); - // get message as JSON String Array - String[] topicMessage = g.fromJson(response, String[].class); - if (topicMessage.length != 0) { - return topicMessage[0]; - } - } catch (Exception e) { - logger.error(" Exception Occered " + e); - return "ERROR:" + e.getMessage() + " Server Response is:" + response; - } - return null; - } - - public String publishTopic(String message) { - try { - String requestURL = this.topicURL + "/events/" + this.topicname; - String authString = this.mechid + ":" + this.password; - String authStringEnc = Base64.encode(authString.getBytes()); - URL url = new URL(requestURL); - HttpURLConnection connection = (HttpURLConnection) url.openConnection(); - connection.setRequestMethod("POST"); - connection.setDoOutput(true); - connection.setRequestProperty("Authorization", "Basic " + authStringEnc); - connection.setRequestProperty("Content-Type", "application/json"); - connection.setRequestProperty("Content-Length", Integer.toString(message.length())); - DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); - wr.write(message.getBytes()); - - InputStream content = (InputStream) connection.getInputStream(); - BufferedReader in = new BufferedReader(new InputStreamReader(content)); - String line; - String response = ""; - while ((line = in.readLine()) != null) { - response = response + line; - } - return response; - - } catch (Exception e) { - logger.error(" Exception Occered " + e); - return "ERROR:" + e.getLocalizedMessage(); - } - } - public void readAgentTopic() { try { int connectionattempt = 0; while (true) { logger.info("--------------------------------"); logger.info("Waiting for Messages for 60 secs"); - String topicMessage = subscribeTopic("60000"); + String topicMessage = topicUtil.subscribeTopic(topicURL, topicname, "60000", mechid, password); Gson g = new Gson(); LinkedTreeMap<?, ?> object = null; if (topicMessage != null) { @@ -322,27 +258,27 @@ public class MirrorMakerAgent { connectionattempt = 0; checkAgentProcess(); } - if(exitLoop){ - break; - } + if (exitLoop) { + break; + } } } catch (Exception e) { logger.error("Exception at readAgentTopic : " + e); } } - - public void readAgent(LinkedTreeMap<?, ?> object,String topicMessage){ - + + public void readAgent(LinkedTreeMap<?, ?> object, String topicMessage) { + Gson g = new Gson(); - + if (object.get("createMirrorMaker") != null) { logger.info("Received createMirrorMaker request from topic"); CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class); createMirrorMaker(m.getCreateMirrorMaker()); checkAgentProcess(); mirrorMakers.setMessageID(m.getMessageID()); - publishTopic(g.toJson(mirrorMakers)); + topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers)); mirrorMakers.setMessageID(""); } else if (object.get("updateMirrorMaker") != null) { logger.info("Received updateMirrorMaker request from topic"); @@ -350,7 +286,7 @@ public class MirrorMakerAgent { updateMirrorMaker(m.getUpdateMirrorMaker()); checkAgentProcess(); mirrorMakers.setMessageID(m.getMessageID()); - publishTopic(g.toJson(mirrorMakers)); + topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers)); mirrorMakers.setMessageID(""); } else if (object.get("deleteMirrorMaker") != null) { logger.info("Received deleteMirrorMaker request from topic"); @@ -358,28 +294,27 @@ public class MirrorMakerAgent { deleteMirrorMaker(m.getDeleteMirrorMaker()); checkAgentProcess(); mirrorMakers.setMessageID(m.getMessageID()); - publishTopic(g.toJson(mirrorMakers)); + topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers)); mirrorMakers.setMessageID(""); } else if (object.get("listAllMirrorMaker") != null) { logger.info("Received listALLMirrorMaker request from topic"); checkAgentProcess(); mirrorMakers.setMessageID((String) object.get("messageID")); - publishTopic(g.toJson(mirrorMakers)); - mirrorMakers.setMessageID(""); + topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers)); } else if (object.get("updateWhiteList") != null) { logger.info("Received updateWhiteList request from topic"); UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class); updateWhiteList(m.getUpdateWhiteList()); checkAgentProcess(); mirrorMakers.setMessageID(m.getMessageID()); - publishTopic(g.toJson(mirrorMakers)); + topicUtil.publishTopic(topicMessage, topicMessage, topicMessage, topicMessage, g.toJson(mirrorMakers)); mirrorMakers.setMessageID(""); } else if (object.get("listMirrorMaker") != null) { logger.info("Received listMirrorMaker from topic, skipping messages"); } else { logger.info("Received unknown request from topic"); } - + } protected void createMirrorMaker(MirrorMaker newMirrorMaker) { @@ -595,7 +530,8 @@ public class MirrorMakerAgent { BasicTextEncryptor textEncryptor = new BasicTextEncryptor(); textEncryptor.setPassword(secret); - //this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password")); + // this.password = + // textEncryptor.decrypt(mirrorMakerProperties.getProperty("password")); this.password = mirrorMakerProperties.getProperty("password"); } catch (IOException ex) { // ex.printStackTrace(); diff --git a/src/main/java/com/att/nsa/dmaapMMAgent/TopicUtil.java b/src/main/java/com/att/nsa/dmaapMMAgent/TopicUtil.java new file mode 100644 index 0000000..f3b8552 --- /dev/null +++ b/src/main/java/com/att/nsa/dmaapMMAgent/TopicUtil.java @@ -0,0 +1,104 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +package com.att.nsa.dmaapMMAgent; + +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; + +import org.apache.log4j.Logger; + +import com.google.gson.Gson; +import com.sun.org.apache.xerces.internal.impl.dv.util.Base64; + +public class TopicUtil { + + static final Logger logger = Logger.getLogger(TopicUtil.class); + + public String publishTopic(String topicURL, String topicname, String mechid, String password, String message) { + try { + String requestURL = topicURL + "/events/" + topicname; + String authString = mechid + ":" + password; + String authStringEnc = Base64.encode(authString.getBytes()); + URL url = new URL(requestURL); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("POST"); + connection.setDoOutput(true); + connection.setRequestProperty("Authorization", "Basic " + authStringEnc); + connection.setRequestProperty("Content-Type", "application/json"); + connection.setRequestProperty("Content-Length", Integer.toString(message.length())); + DataOutputStream wr = new DataOutputStream(connection.getOutputStream()); + wr.write(message.getBytes()); + + InputStream content = (InputStream) connection.getInputStream(); + BufferedReader in = new BufferedReader(new InputStreamReader(content)); + String line; + String response = ""; + while ((line = in.readLine()) != null) { + response = response + line; + } + return response; + + } catch (Exception e) { + logger.error(" Exception Occered " + e); + return "ERROR:" + e.getLocalizedMessage(); + } + } + + public String subscribeTopic(String topicURL, String topicname, String timeout, String mechid, String password) { + String response = ""; + try { + String requestURL = topicURL + "/events/" + topicname + "/mirrormakeragent/1?timeout=" + timeout + + "&limit=1"; + String authString = mechid + ":" + password; + String authStringEnc = Base64.encode(authString.getBytes()); + URL url = new URL(requestURL); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setDoOutput(true); + connection.setRequestProperty("Authorization", "Basic " + authStringEnc); + connection.setRequestProperty("Content-Type", "application/json"); + InputStream content = (InputStream) connection.getInputStream(); + BufferedReader in = new BufferedReader(new InputStreamReader(content)); + String line; + + while ((line = in.readLine()) != null) { + response = response + line; + } + Gson g = new Gson(); + // get message as JSON String Array + String[] topicMessage = g.fromJson(response, String[].class); + if (topicMessage.length != 0) { + return topicMessage[0]; + } + } catch (Exception e) { + logger.error(" Exception Occered " + e); + return "ERROR:" + e.getMessage() + " Server Response is:" + response; + } + return null; + } + +} diff --git a/src/test/java/com/att/nsa/dmaapMMAgent/TestMirrorMakerAgent.java b/src/test/java/com/att/nsa/dmaapMMAgent/TestMirrorMakerAgent.java index 8768962..1dcd6c8 100644 --- a/src/test/java/com/att/nsa/dmaapMMAgent/TestMirrorMakerAgent.java +++ b/src/test/java/com/att/nsa/dmaapMMAgent/TestMirrorMakerAgent.java @@ -24,13 +24,14 @@ package com.att.nsa.dmaapMMAgent; import static org.junit.Assert.*; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; import java.util.ArrayList; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.powermock.modules.junit4.PowerMockRunner; @@ -48,7 +49,10 @@ public class TestMirrorMakerAgent { MirrorMaker mirrorMaker2 = new MirrorMaker(); ArrayList<MirrorMaker> listsMirrorMaker = new ArrayList<MirrorMaker>(); Gson g = new Gson(); - MirrorMakerAgent agent = spy(new MirrorMakerAgent()); + @InjectMocks + private MirrorMakerAgent agent = spy(new MirrorMakerAgent()); + @Mock + private TopicUtil topicUtil; @Before public void setUp() { @@ -69,16 +73,12 @@ public class TestMirrorMakerAgent { @Test public void testReadAgentTopics() { - when(agent.subscribeTopic("60000")).thenReturn(null); agent.exitLoop = true; agent.readAgentTopic(); } @Test public void testReadCreateMirrorMaker() { - when(agent.publishTopic( - "{\"messageID\":\"test\",\"listMirrorMaker\":[{\"name\":\"test\",\"consumer\":\"test\",\"producer\":\"test\",\"status\":\"STOPPED\"}]}")) - .thenReturn(null); String topicMessage = "{ messageID:\"test\", createMirrorMaker: { name:\"test\", consumer:\"test\", producer:\"test\"}}"; LinkedTreeMap<?, ?> object = g.fromJson(topicMessage, LinkedTreeMap.class); agent.readAgent(object, topicMessage); @@ -87,9 +87,6 @@ public class TestMirrorMakerAgent { @Test public void testReadUpdateMirrorMaker() { - when(agent.publishTopic( - "{\"messageID\":\"test\",\"listMirrorMaker\":[{\"name\":\"test\",\"consumer\":\"test\",\"producer\":\"test\",\"status\":\"STOPPED\"}]}")) - .thenReturn(null); String topicMessage = "{ messageID:\"test\", updateMirrorMaker: { name:\"test\", consumer:\"test\", producer:\"test\"}}"; LinkedTreeMap<?, ?> object = g.fromJson(topicMessage, LinkedTreeMap.class); testReadCreateMirrorMaker(); @@ -99,9 +96,6 @@ public class TestMirrorMakerAgent { @Test public void testReadDeleteMirrorMaker() { - when(agent.publishTopic( - "{\"messageID\":\"test\",\"listMirrorMaker\":[{\"name\":\"test\",\"consumer\":\"test\",\"producer\":\"test\",\"status\":\"STOPPED\"}]}")) - .thenReturn(null); String topicMessage = "{ messageID:\"test\", deleteMirrorMaker: { name:\"test\", consumer:\"test\", producer:\"test\", whitelist:\"test\",status:\"test\" }}"; LinkedTreeMap<?, ?> object = g.fromJson(topicMessage, LinkedTreeMap.class); testReadCreateMirrorMaker(); @@ -111,9 +105,6 @@ public class TestMirrorMakerAgent { @Test public void testReadListMirrorMaker() { - when(agent.publishTopic( - "{\"messageID\":\"test\",\"listMirrorMaker\":[{\"name\":\"test\",\"consumer\":\"test\",\"producer\":\"test\",\"status\":\"STOPPED\"}]}")) - .thenReturn(null); String topicMessage = "{ messageID:\"test\", listAllMirrorMaker: { name:\"test\", consumer:\"test\", producer:\"test\", whitelist:\"test\",status:\"test\" }}"; LinkedTreeMap<?, ?> object = g.fromJson(topicMessage, LinkedTreeMap.class); testReadCreateMirrorMaker(); @@ -123,9 +114,6 @@ public class TestMirrorMakerAgent { @Test public void testReadWhitelistMirrorMaker() { - when(agent.publishTopic( - "{\"messageID\":\"test\",\"listMirrorMaker\":[{\"name\":\"test\",\"consumer\":\"test\",\"producer\":\"test\",\"status\":\"STOPPED\"}]}")) - .thenReturn(null); String topicMessage = "{ messageID:\"test\", updateWhiteList: { name:\"test\", consumer:\"test\", producer:\"test\", whitelist:\"test\",status:\"test\" }}"; LinkedTreeMap<?, ?> object = g.fromJson(topicMessage, LinkedTreeMap.class); testReadCreateMirrorMaker(); |