summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/mr
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/mr')
-rw-r--r--src/main/java/org/onap/dmaap/mr/dmaapMMAgent/MirrorMakerAgent.java629
-rw-r--r--src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/CreateMirrorMaker.java44
-rw-r--r--src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/DeleteMirrorMaker.java43
-rw-r--r--src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/ListMirrorMaker.java45
-rw-r--r--src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/MirrorMaker.java89
-rw-r--r--src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/UpdateMirrorMaker.java43
-rw-r--r--src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/UpdateWhiteList.java43
-rw-r--r--src/main/java/org/onap/dmaap/mr/dmaapMMAgent/utils/MirrorMakerProcessHandler.java200
8 files changed, 1136 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/MirrorMakerAgent.java b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/MirrorMakerAgent.java
new file mode 100644
index 0000000..becfbc5
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/MirrorMakerAgent.java
@@ -0,0 +1,629 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.dmaapMMAgent;
+
+import java.io.BufferedReader;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Properties;
+//import org.json.JSONObject;
+//import org.apache.log4j.Logger;
+//import org.jasypt.util.text.BasicTextEncryptor;
+
+import org.onap.dmaap.mr.dmaapMMAgent.dao.CreateMirrorMaker;
+import org.onap.dmaap.mr.dmaapMMAgent.dao.DeleteMirrorMaker;
+import org.onap.dmaap.mr.dmaapMMAgent.dao.ListMirrorMaker;
+import org.onap.dmaap.mr.dmaapMMAgent.dao.MirrorMaker;
+import org.onap.dmaap.mr.dmaapMMAgent.dao.UpdateMirrorMaker;
+import org.onap.dmaap.mr.dmaapMMAgent.dao.UpdateWhiteList;
+import org.onap.dmaap.mr.dmaapMMAgent.utils.MirrorMakerProcessHandler;
+import com.google.gson.Gson;
+import com.google.gson.JsonArray;
+import com.google.gson.internal.LinkedTreeMap;
+import com.sun.org.apache.xerces.internal.impl.dtd.models.CMAny;
+import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;
+
+public class MirrorMakerAgent {/*
+ static final Logger logger = Logger.getLogger(MirrorMakerAgent.class);
+ Properties mirrorMakerProperties = new Properties();
+ ListMirrorMaker mirrorMakers = null;
+ String mmagenthome = "";
+ String kafkahome = "";
+ String topicURL = "";
+ String topicname = "";
+ String mechid = "";
+ String password = "";
+ String grepLog = "";
+ private static String secret = "utdfpWlgyDQ2ZB8SLVRtmN834I1JcT9J";
+
+ public static void main(String[] args) {
+ if (args != null && args.length == 2) {
+ if (args[0].equals("-encrypt")) {
+ BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
+ textEncryptor.setPassword(secret);
+ String plainText = textEncryptor.encrypt(args[1]);
+ System.out.println("Encrypted Password is :" + plainText);
+ return;
+ }
+ } else if (args != null && args.length > 0) {
+ System.out.println(
+ "Usage: ./mmagent to run with the configuration \n -encrypt <password> to Encrypt Password for config ");
+ return;
+ }
+ MirrorMakerAgent agent = new MirrorMakerAgent();
+ if (agent.checkStartup()) {
+ logger.info("mmagent started, loading properties");
+ try {
+ agent.checkAgentProcess();
+ } catch (Exception e) {
+
+ e.printStackTrace();
+ }
+ agent.readAgentTopic();
+ } else {
+ System.out.println(
+ "ERROR: mmagent startup unsuccessful, please make sure the mmagenthome /etc/mmagent.config is set and mechid have the rights to the topic");
+ }
+ }
+
+ private boolean checkStartup() {
+ FileInputStream input = null;
+ try {
+ this.mmagenthome = System.getProperty("MMAGENTHOME");
+ input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
+ logger.info("mmagenthome is set :" + mmagenthome + " loading properties at /etc/mmagent.config");
+ } catch (IOException ex) {
+ logger.error(mmagenthome + "/etc/mmagent.config not found. Set -DMMAGENTHOME and check the config file");
+ return false;
+ } finally {
+ if (input != null) {
+ try {
+ input.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ loadProperties();
+ input = null;
+ try {
+ input = new FileInputStream(kafkahome + "/bin/kafka-run-class.sh");
+ logger.info("kakahome is set :" + kafkahome);
+ } catch (IOException ex) {
+ logger.error(kafkahome + "/bin/kafka-run-class.sh not found. Make sure kafka home is set correctly");
+ return false;
+ } finally {
+ if (input != null) {
+ try {
+ input.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ String response = publishTopic("{\"test\":\"test\"}");
+ if (response.startsWith("ERROR:")) {
+ logger.error("Problem publishing to topic, please verify the config " + this.topicname + " MR URL is:"
+ + this.topicURL + " Error is: " + response);
+ return false;
+ }
+ logger.info("Published to Topic :" + this.topicname + " Successfully");
+ response = subscribeTopic("1");
+ if (response != null && response.startsWith("ERROR:")) {
+ logger.error("Problem subscribing to topic, please verify the config " + this.topicname + " MR URL is:"
+ + this.topicURL + " Error is: " + response);
+ return false;
+ }
+ logger.info("Subscribed to Topic :" + this.topicname + " Successfully");
+ return true;
+ }
+
+ private void checkPropertiesFile(MirrorMaker mm, String propName, boolean refresh) {
+ InputStream input = null;
+ OutputStream out = null;
+ try {
+ if (refresh) {
+ throw new IOException();
+ }
+ input = new FileInputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
+ } catch (IOException ex) {
+ try {
+ input = new FileInputStream(mmagenthome + "/etc/" + propName + ".properties");
+ Properties prop = new Properties();
+ prop.load(input);
+ if (propName.equals("consumer")) {
+ prop.setProperty("group.id", mm.name);
+
+ prop.setProperty("bootstrap.servers", mm.consumer);
+ prop.setProperty("client.id", mm.name + "MM_consumer");
+ } else {
+ prop.setProperty("bootstrap.servers", mm.producer);
+ prop.setProperty("client.id", mm.name + "MM_producer");
+
+ }
+ out = new FileOutputStream(mmagenthome + "/etc/" + mm.name + propName + ".properties");
+ prop.store(out, "");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ } finally {
+ if (input != null) {
+ try {
+ input.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private void checkAgentProcess() throws Exception {
+ logger.info("Checking MirrorMaker Process");
+ if (mirrorMakers != null) {
+ int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
+ for (int i = 0; i < mirrorMakersCount; i++) {
+ MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
+ if (MirrorMakerProcessHandler.checkMirrorMakerProcess(mm.name,mm.enablelogCheck,this.grepLog) == false) {
+ checkPropertiesFile(mm, "consumer", false);
+ checkPropertiesFile(mm, "producer", false);
+
+ if (mm.whitelist != null && !mm.whitelist.equals("")) {
+ logger.info("MirrorMaker " + mm.name + " is not running, restarting. Check Logs for more Details");
+ MirrorMakerProcessHandler.startMirrorMaker(this.mmagenthome, this.kafkahome, mm.name,
+ mmagenthome + "/etc/" + mm.name + "consumer.properties",
+ mmagenthome + "/etc/" + mm.name + "producer.properties",mm.numStreams, mm.whitelist);
+ mm.setStatus("RESTARTING");
+
+ } else {
+ logger.info("MirrorMaker " + mm.name + " is STOPPED");
+ mm.setStatus("STOPPED");
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ mirrorMakers.getListMirrorMaker().set(i, mm);
+ } else {
+ logger.info("MirrorMaker " + mm.name + " is running");
+ mm.setStatus("RUNNING");
+ mirrorMakers.getListMirrorMaker().set(i, mm);
+ }
+ }
+ }
+ // Gson g = new Gson();
+ // System.out.println(g.toJson(mirrorMakers));
+ }
+
+ private String subscribeTopic(String timeout) {
+ String response = "";
+ try {
+ String requestURL = this.topicURL + "/events/" + this.topicname + "/mirrormakeragent/1?timeout=" + timeout
+ + "&limit=1";
+ String authString = this.mechid + ":" + this.password;
+ String authStringEnc = Base64.encode(authString.getBytes());
+ URL url = new URL(requestURL);
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("GET");
+ connection.setDoOutput(true);
+ connection.setRequestProperty("Authorization", "Basic " + authStringEnc);
+ connection.setRequestProperty("Content-Type", "application/json");
+ InputStream content = (InputStream) connection.getInputStream();
+ BufferedReader in = new BufferedReader(new InputStreamReader(content));
+ String line;
+
+ while ((line = in.readLine()) != null) {
+ response = response + line;
+ }
+ Gson g = new Gson();
+ //Get message as JSON Array
+ JsonArray topicMessage = g.fromJson(response, JsonArray.class);
+ if (topicMessage.size() != 0) {
+ return topicMessage.get(0).toString();
+ }
+
+ // get message as JSON String Array
+ String[] topicMessage = g.fromJson(response, String[].class);
+ if (topicMessage.length != 0) {
+ return topicMessage[0];
+ }
+ } catch (Exception e) {
+ return "ERROR:" + e.getMessage() + " Server Response is:" + response;
+ }
+ return null;
+ }
+
+ private String publishTopic(String message) {
+ try {
+ String requestURL = this.topicURL + "/events/" + this.topicname;
+ String authString = this.mechid + ":" + this.password;
+ String authStringEnc = Base64.encode(authString.getBytes());
+ URL url = new URL(requestURL);
+ HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ connection.setRequestMethod("POST");
+ connection.setDoOutput(true);
+ connection.setRequestProperty("Authorization", "Basic " + authStringEnc);
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setRequestProperty("Content-Length", Integer.toString(message.length()));
+ DataOutputStream wr = new DataOutputStream(connection.getOutputStream());
+ wr.write(message.getBytes());
+
+ InputStream content = (InputStream) connection.getInputStream();
+ BufferedReader in = new BufferedReader(new InputStreamReader(content));
+ String line;
+ String response = "";
+ while ((line = in.readLine()) != null) {
+ response = response + line;
+ }
+ return response;
+
+ } catch (Exception e) {
+ return "ERROR:" + e.getLocalizedMessage();
+ }
+ }
+
+ private void readAgentTopic() {
+ try {
+ int connectionattempt = 0;
+ while (true) {
+ logger.info("--------------------------------");
+ logger.info("Waiting for Messages for 60 secs");
+ String topicMessage = subscribeTopic("60000");
+ Gson g = new Gson();
+ LinkedTreeMap<?, ?> object = null;
+ if (topicMessage != null) {
+ try {
+ //Check and parse if String object returned by consumer API
+ //else use the jsonObject
+ if( topicMessage.startsWith("\""))
+ {
+ topicMessage = g.fromJson(topicMessage.toString(), String.class);
+ }
+ object = g.fromJson(topicMessage, LinkedTreeMap.class);
+
+ // Cast the 1st item (since limit=1 and see the type of
+ // object
+ if (object.get("createMirrorMaker") != null) {
+ logger.info("Received createMirrorMaker request from topic");
+ CreateMirrorMaker m = g.fromJson(topicMessage, CreateMirrorMaker.class);
+ createMirrorMaker(m.getCreateMirrorMaker());
+ checkAgentProcess();
+ mirrorMakers.setMessageID(m.getMessageID());
+ publishTopic(g.toJson(mirrorMakers));
+ mirrorMakers.setMessageID("");
+ } else if (object.get("updateMirrorMaker") != null) {
+ logger.info("Received updateMirrorMaker request from topic");
+ UpdateMirrorMaker m = g.fromJson(topicMessage, UpdateMirrorMaker.class);
+ JSONObject json = new JSONObject(topicMessage);
+ JSONObject json2 = (JSONObject) json.get("updateMirrorMaker");
+ if(!json2.has("numStreams")){
+ m.getUpdateMirrorMaker().setNumStreams(0);
+ }
+ updateMirrorMaker(m.getUpdateMirrorMaker());
+ checkAgentProcess();
+ mirrorMakers.setMessageID(m.getMessageID());
+ publishTopic(g.toJson(mirrorMakers));
+ mirrorMakers.setMessageID("");
+ } else if (object.get("deleteMirrorMaker") != null) {
+ logger.info("Received deleteMirrorMaker request from topic");
+ DeleteMirrorMaker m = g.fromJson(topicMessage, DeleteMirrorMaker.class);
+ deleteMirrorMaker(m.getDeleteMirrorMaker());
+ checkAgentProcess();
+ mirrorMakers.setMessageID(m.getMessageID());
+ publishTopic(g.toJson(mirrorMakers));
+ mirrorMakers.setMessageID("");
+ } else if (object.get("listAllMirrorMaker") != null) {
+ logger.info("Received listALLMirrorMaker request from topic");
+ checkAgentProcess();
+ mirrorMakers.setMessageID((String) object.get("messageID"));
+ publishTopic(g.toJson(mirrorMakers));
+ mirrorMakers.setMessageID("");
+ } else if (object.get("updateWhiteList") != null) {
+ logger.info("Received updateWhiteList request from topic");
+ UpdateWhiteList m = g.fromJson(topicMessage, UpdateWhiteList.class);
+ updateWhiteList(m.getUpdateWhiteList());
+ checkAgentProcess();
+ mirrorMakers.setMessageID(m.getMessageID());
+ publishTopic(g.toJson(mirrorMakers));
+ mirrorMakers.setMessageID("");
+ } else if (object.get("listMirrorMaker") != null) {
+ logger.info("Received listMirrorMaker from topic, skipping messages");
+ } else {
+ logger.info("Received unknown request from topic");
+ }
+ } catch (Exception ex) {
+ connectionattempt++;
+ if (connectionattempt > 5) {
+ logger.info("Can't connect to the topic, mmagent shutting down , " + topicMessage);
+ return;
+ }
+ logger.info("Can't connect to the topic, " + topicMessage + " Retrying " + connectionattempt
+ + " of 5 times in 1 minute" + " Error:" + ex.getLocalizedMessage());
+ Thread.sleep(60000);
+ }
+ } else {
+ // Check all MirrorMaker every min
+ connectionattempt = 0;
+ checkAgentProcess();
+ }
+
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ private void createMirrorMaker(MirrorMaker newMirrorMaker) {
+ boolean exists = false;
+ if (mirrorMakers != null) {
+ int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
+ for (int i = 0; i < mirrorMakersCount; i++) {
+ MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
+ if (mm.name.equals(newMirrorMaker.name)) {
+ exists = true;
+ logger.info("MirrorMaker already exist for:" + newMirrorMaker.name);
+ return;
+ }
+ }
+ }
+ logger.info("Adding new MirrorMaker:" + newMirrorMaker.name);
+ if (exists == false && mirrorMakers != null) {
+ mirrorMakers.getListMirrorMaker().add(newMirrorMaker);
+ } else if (exists == false && mirrorMakers == null) {
+ mirrorMakers = new ListMirrorMaker();
+ ArrayList<MirrorMaker> list = mirrorMakers.getListMirrorMaker();
+ list = new ArrayList<MirrorMaker>();
+ list.add(newMirrorMaker);
+ mirrorMakers.setListMirrorMaker(list);
+ }
+ checkPropertiesFile(newMirrorMaker, "consumer", true);
+ checkPropertiesFile(newMirrorMaker, "producer", true);
+
+ Gson g = new Gson();
+ mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
+ OutputStream out = null;
+ try {
+ out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
+ mirrorMakerProperties.store(out, "");
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private void updateMirrorMaker(MirrorMaker newMirrorMaker) {
+ boolean exists = false;
+ if (mirrorMakers != null) {
+ int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
+ for (int i = 0; i < mirrorMakersCount; i++) {
+ MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
+ if (mm.name.equals(newMirrorMaker.name)) {
+ exists = true;
+ if(null!=newMirrorMaker.getConsumer())
+ {
+ mm.setConsumer(newMirrorMaker.getConsumer());
+ }
+ if(null!=newMirrorMaker.getProducer())
+ {
+ mm.setProducer(newMirrorMaker.getProducer());
+ }
+ if(newMirrorMaker.getNumStreams()>=1)
+ {
+ mm.setNumStreams(newMirrorMaker.getNumStreams());
+ }
+
+ mm.setEnablelogCheck(newMirrorMaker.enablelogCheck);
+
+ mirrorMakers.getListMirrorMaker().set(i, mm);
+ newMirrorMaker=mm;
+ logger.info("Updating MirrorMaker:" + newMirrorMaker.name);
+ }
+ }
+ }
+ if (exists) {
+ checkPropertiesFile(newMirrorMaker, "consumer", true);
+ checkPropertiesFile(newMirrorMaker, "producer", true);
+
+ Gson g = new Gson();
+ mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
+ OutputStream out = null;
+ try {
+ out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
+ mirrorMakerProperties.store(out, "");
+ MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ } else {
+ logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
+ }
+ }
+
+ private void updateWhiteList(MirrorMaker newMirrorMaker) {
+ boolean exists = false;
+ if (mirrorMakers != null) {
+ int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
+ for (int i = 0; i < mirrorMakersCount; i++) {
+ MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
+ if (mm.name.equals(newMirrorMaker.name)) {
+ exists = true;
+ mm.setWhitelist(newMirrorMaker.whitelist);
+ mirrorMakers.getListMirrorMaker().set(i, mm);
+ logger.info("Updating MirrorMaker WhiteList:" + newMirrorMaker.name + " WhiteList:"
+ + newMirrorMaker.whitelist);
+ }
+ }
+ }
+ if (exists) {
+ Gson g = new Gson();
+ mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
+ OutputStream out = null;
+ try {
+ out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
+ mirrorMakerProperties.store(out, "");
+ MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ } else {
+ logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
+ }
+ }
+
+ private void deleteMirrorMaker(MirrorMaker newMirrorMaker) {
+ boolean exists = false;
+ if (mirrorMakers != null) {
+ int mirrorMakersCount = mirrorMakers.getListMirrorMaker().size();
+ for (int i = 0; i < mirrorMakersCount; i++) {
+ MirrorMaker mm = mirrorMakers.getListMirrorMaker().get(i);
+ if (mm.name.equals(newMirrorMaker.name)) {
+ exists = true;
+ mirrorMakers.getListMirrorMaker().remove(i);
+ logger.info("Removing MirrorMaker:" + newMirrorMaker.name);
+ i = mirrorMakersCount;
+ }
+ }
+ }
+ if (exists) {
+ try {
+ String path = mmagenthome + "/etc/" + newMirrorMaker.name + "consumer" + ".properties";
+ File file = new File(path);
+ file.delete();
+ } catch (Exception ex) {
+ }
+ try {
+ String path = mmagenthome + "/etc/" + newMirrorMaker.name + "producer" + ".properties";
+ File file = new File(path);
+ file.delete();
+ } catch (Exception ex) {
+ }
+ Gson g = new Gson();
+ mirrorMakerProperties.setProperty("mirrormakers", g.toJson(this.mirrorMakers));
+ OutputStream out = null;
+ try {
+ out = new FileOutputStream(mmagenthome + "/etc/mmagent.config");
+ mirrorMakerProperties.store(out, "");
+ MirrorMakerProcessHandler.stopMirrorMaker(newMirrorMaker.name);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ } else {
+ logger.info("MirrorMaker Not found for:" + newMirrorMaker.name);
+ }
+ }
+
+ private void loadProperties() {
+ InputStream input = null;
+ try {
+
+ input = new FileInputStream(mmagenthome + "/etc/mmagent.config");
+ mirrorMakerProperties.load(input);
+ Gson g = new Gson();
+ if (mirrorMakerProperties.getProperty("mirrormakers") == null) {
+ this.mirrorMakers = new ListMirrorMaker();
+ ArrayList<MirrorMaker> list = this.mirrorMakers.getListMirrorMaker();
+ list = new ArrayList<MirrorMaker>();
+ this.mirrorMakers.setListMirrorMaker(list);
+ } else {
+ this.mirrorMakers = g.fromJson(mirrorMakerProperties.getProperty("mirrormakers"),
+ ListMirrorMaker.class);
+ }
+
+ this.kafkahome = mirrorMakerProperties.getProperty("kafkahome");
+ this.topicURL = mirrorMakerProperties.getProperty("topicURL");
+ this.topicname = mirrorMakerProperties.getProperty("topicname");
+ this.mechid = mirrorMakerProperties.getProperty("mechid");
+ this.grepLog= mirrorMakerProperties.getProperty("grepLog");
+
+ BasicTextEncryptor textEncryptor = new BasicTextEncryptor();
+ textEncryptor.setPassword(secret);
+ this.password = textEncryptor.decrypt(mirrorMakerProperties.getProperty("password"));
+ } catch (IOException ex) {
+ // ex.printStackTrace();
+ } finally {
+ if (input != null) {
+ try {
+ input.close();
+ } catch (IOException e) {
+ // e.printStackTrace();
+ }
+ }
+ }
+
+ }
+*/}
diff --git a/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/CreateMirrorMaker.java b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/CreateMirrorMaker.java
new file mode 100644
index 0000000..4d1426a
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/CreateMirrorMaker.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.dmaapMMAgent.dao;
+
+public class CreateMirrorMaker {
+ String messageID;
+ MirrorMaker MirrorMaker;
+
+ public MirrorMaker getCreateMirrorMaker() {
+ return MirrorMaker;
+ }
+
+ public void setCreateMirrorMaker(MirrorMaker createMirrorMaker) {
+ this.MirrorMaker = createMirrorMaker;
+ }
+
+ public String getMessageID() {
+ return messageID;
+ }
+
+ public void setMessageID(String messageID) {
+ this.messageID = messageID;
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/DeleteMirrorMaker.java b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/DeleteMirrorMaker.java
new file mode 100644
index 0000000..a76e508
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/DeleteMirrorMaker.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.dmaapMMAgent.dao;
+
+public class DeleteMirrorMaker {
+ String messageID;
+ MirrorMaker deleteMirrorMaker;
+
+ public MirrorMaker getDeleteMirrorMaker() {
+ return deleteMirrorMaker;
+ }
+
+ public void setDeleteMirrorMaker(MirrorMaker deleteMirrorMaker) {
+ this.deleteMirrorMaker = deleteMirrorMaker;
+ }
+
+ public String getMessageID() {
+ return messageID;
+ }
+
+ public void setMessageID(String messageID) {
+ this.messageID = messageID;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/ListMirrorMaker.java b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/ListMirrorMaker.java
new file mode 100644
index 0000000..8176a1e
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/ListMirrorMaker.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.dmaapMMAgent.dao;
+
+import java.util.ArrayList;
+
+public class ListMirrorMaker {
+ String messageID;
+ ArrayList<MirrorMaker> listMirrorMaker;
+
+ public ArrayList<MirrorMaker> getListMirrorMaker() {
+ return listMirrorMaker;
+ }
+
+ public void setListMirrorMaker(ArrayList<MirrorMaker> createMirrorMaker) {
+ this.listMirrorMaker = createMirrorMaker;
+ }
+
+ public String getMessageID() {
+ return messageID;
+ }
+
+ public void setMessageID(String messageID) {
+ this.messageID = messageID;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/MirrorMaker.java b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/MirrorMaker.java
new file mode 100644
index 0000000..33387f7
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/MirrorMaker.java
@@ -0,0 +1,89 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.dmaapMMAgent.dao;
+
+public class MirrorMaker {
+ public String name;
+ public String consumer;
+ public String producer;
+ public String whitelist;
+ public String status;
+ public int numStreams=1;
+ public boolean enablelogCheck = false;
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(String consumer) {
+ this.consumer = consumer;
+ }
+
+ public String getProducer() {
+ return producer;
+ }
+
+ public void setProducer(String producer) {
+ this.producer = producer;
+ }
+
+ public String getWhitelist() {
+ return whitelist;
+ }
+
+ public void setWhitelist(String whitelist) {
+ this.whitelist = whitelist;
+ }
+
+ public int getNumStreams() {
+ return numStreams;
+ }
+
+ public void setNumStreams(int numStreams) {
+ this.numStreams = numStreams;
+ }
+
+ public boolean isEnablelogCheck() {
+ return enablelogCheck;
+ }
+
+ public void setEnablelogCheck(boolean enablelogCheck) {
+ this.enablelogCheck = enablelogCheck;
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/UpdateMirrorMaker.java b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/UpdateMirrorMaker.java
new file mode 100644
index 0000000..d31cf85
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/UpdateMirrorMaker.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.dmaapMMAgent.dao;
+
+public class UpdateMirrorMaker {
+ String messageID;
+ MirrorMaker updateMirrorMaker;
+
+ public MirrorMaker getUpdateMirrorMaker() {
+ return updateMirrorMaker;
+ }
+
+ public void setUpdateMirrorMaker(MirrorMaker updateMirrorMaker) {
+ this.updateMirrorMaker = updateMirrorMaker;
+ }
+
+ public String getMessageID() {
+ return messageID;
+ }
+
+ public void setMessageID(String messageID) {
+ this.messageID = messageID;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/UpdateWhiteList.java b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/UpdateWhiteList.java
new file mode 100644
index 0000000..9e808f9
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/dao/UpdateWhiteList.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.dmaapMMAgent.dao;
+
+public class UpdateWhiteList {
+ String messageID;
+ MirrorMaker updateWhiteList;
+
+ public MirrorMaker getUpdateWhiteList() {
+ return updateWhiteList;
+ }
+
+ public void setUpdateWhiteList(MirrorMaker updateWhiteList) {
+ this.updateWhiteList = updateWhiteList;
+ }
+
+ public String getMessageID() {
+ return messageID;
+ }
+
+ public void setMessageID(String messageID) {
+ this.messageID = messageID;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/utils/MirrorMakerProcessHandler.java b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/utils/MirrorMakerProcessHandler.java
new file mode 100644
index 0000000..82086a3
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/mr/dmaapMMAgent/utils/MirrorMakerProcessHandler.java
@@ -0,0 +1,200 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.mr.dmaapMMAgent.utils;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.apache.log4j.Logger;
+
+public class MirrorMakerProcessHandler {/*
+ static final Logger logger = Logger.getLogger(MirrorMakerProcessHandler.class);
+ static String mmagenthome = System.getProperty("MMAGENTHOME");
+
+ public static boolean checkMirrorMakerProcess(String agentname, boolean enablelogCheck, String grepLog) throws Exception {
+ String line,linelog;
+ try {
+ Runtime rt = Runtime.getRuntime();
+ Process mmprocess = null;
+
+ if (System.getProperty("os.name").contains("Windows")) {
+ String args = "";
+ args = "wmic.exe process where \"commandline like '%agentname=" + agentname
+ + "~%' and caption='java.exe'\"";
+ mmprocess = rt.exec(args);
+ } else {
+ //String args[] = { "/bin/sh", "-c", "ps -ef |grep java |grep agentname=" + agentname + "~" };
+
+ String args[] = { "/bin/sh", "-c", "ps -ef | grep `ps -ef |grep agentname=" + agentname + "~ | egrep -v 'grep|java' | awk '{print $2}' `| egrep -v '/bin/sh|grep' "};
+ logger.info("CheckMM process->"+args[2]);
+ mmprocess = rt.exec(args);
+ }
+
+ InputStream is = mmprocess.getInputStream();
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+
+ while ((line = br.readLine()) != null) {
+ System.out.println(line);
+ // if (line.contains("agentname=" + agentname) && line.contains("/bin/sh -c") == false) {
+
+ //If enablelogCheck Check MirrorMaker log for errors and restart mirrormaker
+ if(enablelogCheck) {
+ logger.info("Check if MM log contains any errors");
+ String args2[];
+ args2 = new String[] { "/bin/sh", "-c", "grep -i ERROR "+ mmagenthome + "/logs/" + agentname + "_MMaker.log"};
+ if(null!=grepLog && !grepLog.isEmpty())
+ {
+ args2 = new String[]{ "/bin/sh", "-c", grepLog +" " + mmagenthome + "/logs/" + agentname + "_MMaker.log"};
+ }
+ logger.info("Grep log args-- "+args2[2]);
+ mmprocess = rt.exec(args2);
+ InputStream islog = mmprocess.getInputStream();
+ InputStreamReader isrlog = new InputStreamReader(islog);
+ BufferedReader brlog = new BufferedReader(isrlog);
+
+ while ((linelog = brlog.readLine()) != null)
+ {
+ logger.info("Error from MM log--"+linelog);
+
+ if (linelog.toLowerCase().contains("ERROR".toLowerCase()) ||
+ linelog.toLowerCase().contains("Issue".toLowerCase()) )
+ {
+ logger.info("MM log contains error Stop MM and restart");
+ stopMirrorMaker(agentname);
+ isrlog.close();
+ brlog.close();
+ return false;
+ }
+
+
+ }
+ isrlog.close();
+ brlog.close();
+ }
+
+ return true;
+ // }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return false;
+ }
+
+ public static void stopMirrorMaker(String agentname) {
+ try {
+ Runtime rt = Runtime.getRuntime();
+ Process killprocess = null;
+
+ if (System.getProperty("os.name").contains("Windows")) {
+ String args = "wmic.exe process where \"commandline like '%agentname=" + agentname
+ + "~%' and caption='java.exe'\" call terminate";
+ killprocess = rt.exec(args);
+ } else {
+ //String args[] = { "/bin/sh", "-c",
+ // "kill -9 $(ps -ef |grep java |grep agentname=" + agentname + "~| awk '{print $2}')" };
+
+ //String args[] = { "/bin/sh", "-c",
+ // "kill -9 `ps -ef |grep agentname=" + agentname + "~| egrep -v 'grep|java' | awk '{print $2}'` | egrep -v '/bin/sh|grep'"};
+ String args[] = { "/bin/sh", "-c",
+ "for i in `ps -ef |grep agentname="+ agentname + "~ | egrep -v 'grep|java' | awk '{print $2}'`;do kill -9 `ps -eaf | grep $i | egrep -v '/bin/sh|grep' | awk '{print $2}'` ;done"};
+ logger.info ("Stop MM ->"+args[2]);
+ // args = "kill $(ps -ef |grep java |grep agentname=" +
+ // agentname + "~| awk '{print $2}')";
+ killprocess = rt.exec(args);
+ }
+
+ InputStream is = killprocess.getInputStream();
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+ String line;
+ while ((line = br.readLine()) != null) {
+ // System.out.println(line);
+ }
+
+ logger.info("Mirror Maker " + agentname + " Stopped");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ public static void startMirrorMaker(String mmagenthome, String kafkaHome, String agentName, String consumerConfig,
+ String producerConfig, int numStreams, String whitelist) {
+ try {
+ Runtime rt = Runtime.getRuntime();
+
+ if (System.getProperty("os.name").contains("Windows")) {
+ String args = kafkaHome + "/bin/windows/kafka-run-class.bat -Dagentname=" + agentName
+ + "~ kafka.tools.MirrorMaker --consumer.config " + consumerConfig + " --producer.config "
+ + producerConfig +" --num.streams " + numStreams + " --abort.on.send.failure true" +" --whitelist '" + whitelist + "' > " + mmagenthome + "/logs/" + agentName
+ + "_MMaker.log";
+ final Process process = rt.exec(args);
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ InputStream is = process.getInputStream();
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+ String line;
+ while ((line = br.readLine()) != null) {
+ // System.out.println(line);
+ }
+ } catch (Exception anExc) {
+ anExc.printStackTrace();
+ }
+ }
+ }.start();
+ } else {
+ String args[] = { "/bin/sh", "-c",
+ kafkaHome + "/bin/kafka-run-class.sh -Dagentname=" + agentName
+ + "~ kafka.tools.MirrorMaker --consumer.config " + consumerConfig
+ + " --producer.config " + producerConfig + " --num.streams " + numStreams + " --abort.on.send.failure true" + " --whitelist '" + whitelist + "' >"
+ + mmagenthome + "/logs/" + agentName + "_MMaker.log 2>&1" };
+ final Process process = rt.exec(args);
+ new Thread() {
+ public void run() {
+ try {
+ InputStream is = process.getInputStream();
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+ String line;
+ while ((line = br.readLine()) != null) {
+ // System.out.println(line);
+ }
+ } catch (Exception anExc) {
+ anExc.printStackTrace();
+ }
+ }
+ }.start();
+ }
+
+ logger.info("Mirror Maker " + agentName + " Started" + " WhiteListing:" + whitelist);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+*/}