diff options
Diffstat (limited to 'datarouter-prov/src')
9 files changed, 524 insertions, 157 deletions
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java index 7475b6b9..ef106ab4 100755 --- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java @@ -242,7 +242,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { private static String provName = "feeds-drtr.web.att.com"; /** - * The standard FQDN of the ACTIVE provisioning server in this Data Router ecosystem + * The standard FQDN of the ACTIVE_POD provisioning server in this Data Router ecosystem */ private static String activeProvName = "feeds-drtr.web.att.com"; @@ -719,7 +719,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { } /** - * Gets the FQDN of the initially ACTIVE provisioning server (POD). Note: this used to be called isActivePOD(), + * Gets the FQDN of the initially ACTIVE_POD provisioning server (POD). Note: this used to be called isActivePOD(), * however, that is a misnomer, as the active status could shift to the standby POD without these parameters * changing. Hence, the function names have been changed to more accurately reflect their purpose. * @@ -730,7 +730,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { } /** - * Gets the FQDN of the initially STANDBY provisioning server (POD). Note: this used to be called isStandbyPOD(), + * Gets the FQDN of the initially STANDBY_POD provisioning server (POD). Note: this used to be called isStandbyPOD(), * however, that is a misnomer, as the standby status could shift to the active POD without these parameters * changing. Hence, the function names have been changed to more accurately reflect their purpose. * diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/Poker.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/Poker.java index 8afa18a1..6cb8520d 100644 --- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/Poker.java +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/Poker.java @@ -86,8 +86,8 @@ public class Poker extends TimerTask { try {
thisPod = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
- thisPod = "*UNKNOWN*"; // not a major problem
- logger.info("UnknownHostException: Setting thisPod to \"*UNKNOWN*\"", e);
+ thisPod = "*UNKNOWN_POD*"; // not a major problem
+ logger.info("UnknownHostException: Setting thisPod to \"*UNKNOWN_POD*\"", e);
}
provString = buildProvisioningString();
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/ProxyServlet.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/ProxyServlet.java index 77917771..75423602 100755 --- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/ProxyServlet.java +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/ProxyServlet.java @@ -146,7 +146,7 @@ public class ProxyServlet extends BaseServlet { */ public boolean isProxyServer() { SynchronizerTask st = SynchronizerTask.getSynchronizer(); - return st.getState() == SynchronizerTask.STANDBY; + return st.getPodState() == SynchronizerTask.STANDBY_POD; } /** diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java index d3ae4fe3..8c5a49a4 100644 --- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java @@ -24,6 +24,11 @@ package org.onap.dmaap.datarouter.provisioning; +import static org.onap.dmaap.datarouter.provisioning.BaseServlet.TEXT_CT; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; @@ -50,8 +55,6 @@ import java.util.TreeSet; import javax.servlet.http.HttpServletResponse; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; @@ -85,7 +88,7 @@ import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities; * <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to * the active (master) POD.</li> * <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li> - * <li>Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN) + * <li>Providing information to other parts of the system as to the current role (ACTIVE_POD, STANDBY_POD, UNKNOWN_POD) * of this POD.</li> * </ol> * <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p> @@ -99,167 +102,152 @@ import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities; public class SynchronizerTask extends TimerTask { /** - * This is a singleton -- there is only one SynchronizerTask object in the server + * This is a singleton -- there is only one SynchronizerTask object in the server. */ private static SynchronizerTask synctask; /** - * This POD is unknown -- not on the list of PODs + * This POD is unknown -- not on the list of PODs. */ - public static final int UNKNOWN = 0; + public static final int UNKNOWN_POD = 0; /** - * This POD is active -- on the list of PODs, and the DNS CNAME points to us + * This POD is active -- on the list of PODs, and the DNS CNAME points to us. */ - public static final int ACTIVE = 1; + public static final int ACTIVE_POD = 1; /** - * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us + * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us. */ - public static final int STANDBY = 2; - private static final String[] stnames = {"UNKNOWN", "ACTIVE", "STANDBY"}; + public static final int STANDBY_POD = 2; + + private static final String[] stnames = {"UNKNOWN_POD", "ACTIVE_POD", "STANDBY_POD"}; private static final long ONE_HOUR = 60 * 60 * 1000L; + private long nextMsg = 0; // only display the "Current podState" msg every 5 mins. + private final EELFLogger logger; private final Timer rolex; private final String spooldir; - private int state; + private int podState; private boolean doFetch; private long nextsynctime; private AbstractHttpClient httpclient = null; - /** - * Get the singleton SynchronizerTask object. - * - * @return the SynchronizerTask - */ - public static synchronized SynchronizerTask getSynchronizer() { - if (synctask == null) { - synctask = new SynchronizerTask(); - } - return synctask; - } - @SuppressWarnings("deprecation") private SynchronizerTask() { logger = EELFManager.getInstance().getLogger("InternalLog"); rolex = new Timer(); spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir"); - state = UNKNOWN; + podState = UNKNOWN_POD; doFetch = true; // start off with a fetch nextsynctime = 0; - logger.info("PROV5000: Sync task starting, server state is UNKNOWN"); + logger.info("PROV5000: Sync task starting, server podState is UNKNOWN_POD"); try { Properties props = (new DB()).getProperties(); String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks"); String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY); String pass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY); KeyStore keyStore = KeyStore.getInstance(type); - try(FileInputStream instream = new FileInputStream(new File(store))) { + try (FileInputStream instream = new FileInputStream(new File(store))) { keyStore.load(instream, pass.toCharArray()); } - store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY); - pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY); - KeyStore trustStore = null; - if (store != null && store.length() > 0) { - trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); - try(FileInputStream instream = new FileInputStream(new File(store))){ - trustStore.load(instream, pass.toCharArray()); + store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY); + pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY); + KeyStore trustStore = null; + if (store != null && store.length() > 0) { + trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + try (FileInputStream instream = new FileInputStream(new File(store))) { + trustStore.load(instream, pass.toCharArray()); - } } + } // We are connecting with the node name, but the certificate will have the CNAME // So we need to accept a non-matching certificate name - String keystorepass = props.getProperty( - Main.KEYSTORE_PASS_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref - try(AbstractHttpClient hc = new DefaultHttpClient()) { - SSLSocketFactory socketFactory = - (trustStore == null) - ? new SSLSocketFactory(keyStore, keystorepass) - : new SSLSocketFactory(keyStore, keystorepass, trustStore); - socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); - Scheme sch = new Scheme("https", 443, socketFactory); - hc.getConnectionManager().getSchemeRegistry().register(sch); - httpclient = hc; - } - // Run once every 5 seconds to check DNS, etc. - long interval = 0; - try { - String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000"); - interval = Long.parseLong(s); - } catch (NumberFormatException e) { - interval = 5000L; + String keystorepass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY); + try (AbstractHttpClient hc = new DefaultHttpClient()) { + SSLSocketFactory socketFactory = + (trustStore == null) + ? new SSLSocketFactory(keyStore, keystorepass) + : new SSLSocketFactory(keyStore, keystorepass, trustStore); + socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); + Scheme sch = new Scheme("https", 443, socketFactory); + hc.getConnectionManager().getSchemeRegistry().register(sch); + httpclient = hc; } - rolex.scheduleAtFixedRate(this, 0L, interval); + setSynchTimer(props); } catch (Exception e) { logger.warn("PROV5005: Problem starting the synchronizer: " + e); } } + private void setSynchTimer(Properties props) { + // Run once every 5 seconds to check DNS, etc. + long interval; + try { + String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000"); + interval = Long.parseLong(s); + } catch (NumberFormatException e) { + interval = 5000L; + } + rolex.scheduleAtFixedRate(this, 0L, interval); + } + + /** + * Get the singleton SynchronizerTask object. + * + * @return the SynchronizerTask + */ + public static synchronized SynchronizerTask getSynchronizer() { + if (synctask == null) { + synctask = new SynchronizerTask(); + } + return synctask; + } + /** - * What is the state of this POD? + * What is the podState of this POD?. * - * @return one of ACTIVE, STANDBY, UNKNOWN + * @return one of ACTIVE_POD, STANDBY_POD, UNKNOWN_POD */ - public int getState() { - return state; + public int getPodState() { + return podState; } /** - * Is this the active POD? + * Is this the active POD?. * * @return true if we are active (the master), false otherwise */ public boolean isActive() { - return state == ACTIVE; + return podState == ACTIVE_POD; } /** * This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we * should re-synchronize with the master. */ - public void doFetch() { + void doFetch() { doFetch = true; } /** * Runs once a minute in order to <ol> * <li>lookup DNS names,</li> - * <li>determine the state of this POD,</li> - * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.</li> + * <li>determine the podState of this POD,</li> + * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of podState from the active POD.</li> * <li>if this is a standby POD, check if there are any new log records to be replicated.</li> - * </ol> + * </ol>. */ @Override public void run() { try { - state = lookupState(); - if (state == STANDBY) { + podState = lookupState(); + if (podState == STANDBY_POD) { // Only copy provisioning data FROM the active server TO the standby if (doFetch || (System.currentTimeMillis() >= nextsynctime)) { - logger.debug("Initiating a sync..."); - JSONObject jo = readProvisioningJSON(); - if (jo != null) { - doFetch = false; - syncFeeds(jo.getJSONArray("feeds")); - syncSubs(jo.getJSONArray("subscriptions")); - syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610 - syncParams(jo.getJSONObject("parameters")); - // The following will not be present in a version=1.0 provfeed - JSONArray ja = jo.optJSONArray("ingress"); - if (ja != null) { - syncIngressRoutes(ja); - } - JSONObject j2 = jo.optJSONObject("egress"); - if (j2 != null) { - syncEgressRoutes(j2); - } - ja = jo.optJSONArray("routing"); - if (ja != null) { - syncNetworkRoutes(ja); - } - } + syncProvisioningData(); logger.info("PROV5013: Sync completed."); nextsynctime = System.currentTimeMillis() + ONE_HOUR; } @@ -278,7 +266,7 @@ public class SynchronizerTask extends TimerTask { remote.andNot(local); if (!remote.isEmpty()) { logger.debug(" Replicating logs: " + remote); - replicateDRLogs(remote); + replicateDataRouterLogs(remote); } } } catch (Exception e) { @@ -286,14 +274,39 @@ public class SynchronizerTask extends TimerTask { } } + private void syncProvisioningData() { + logger.debug("Initiating a sync..."); + JSONObject jo = readProvisioningJson(); + if (jo != null) { + doFetch = false; + syncFeeds(jo.getJSONArray("feeds")); + syncSubs(jo.getJSONArray("subscriptions")); + syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610 + syncParams(jo.getJSONObject("parameters")); + // The following will not be present in a version=1.0 provfeed + JSONArray ja = jo.optJSONArray("ingress"); + if (ja != null) { + syncIngressRoutes(ja); + } + JSONObject j2 = jo.optJSONObject("egress"); + if (j2 != null) { + syncEgressRoutes(j2); + } + ja = jo.optJSONArray("routing"); + if (ja != null) { + syncNetworkRoutes(ja); + } + } + } + /** - * This method is used to lookup the CNAME that points to the active server. It returns 0 (UNKNOWN), 1(ACTIVE), or 2 - * (STANDBY) to indicate the state of this server. + * This method is used to lookup the CNAME that points to the active server. + * It returns 0 (UNKNOWN_POD), 1(ACTIVE_POD), or (STANDBY_POD) to indicate the podState of this server. * - * @return the current state + * @return the current podState */ - private int lookupState() { - int newstate = UNKNOWN; + int lookupState() { + int newPodState = UNKNOWN_POD; try { InetAddress myaddr = InetAddress.getLocalHost(); if (logger.isTraceEnabled()) { @@ -303,9 +316,9 @@ public class SynchronizerTask extends TimerTask { Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods())); if (pods.contains(thisPod)) { InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName()); - newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY; + newPodState = myaddr.equals(pserver) ? ACTIVE_POD : STANDBY_POD; if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) { - logger.debug("Active POD = " + pserver + ", Current state is " + stnames[newstate]); + logger.debug("Active POD = " + pserver + ", Current podState is " + stnames[newPodState]); nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L); } } else { @@ -315,15 +328,13 @@ public class SynchronizerTask extends TimerTask { logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e); } - if (newstate != state) { - logger - .info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate])); + if (newPodState != podState) { + logger.info(String.format("PROV5001: Server podState changed from %s to %s", + stnames[podState], stnames[newPodState])); } - return newstate; + return newPodState; } - private static long nextMsg = 0; // only display the "Current state" msg every 5 mins. - /** * Synchronize the Feeds in the JSONArray, with the Feeds in the DB. */ @@ -396,7 +407,7 @@ public class SynchronizerTask extends TimerTask { try { v = "" + jo.getInt(k); } catch (JSONException e1) { - logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e); + logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e1); JSONArray ja = jo.getJSONArray(k); for (int i = 0; i < ja.length(); i++) { if (i > 0) { @@ -455,7 +466,7 @@ public class SynchronizerTask extends TimerTask { NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n)); coll.add(nr); } catch (JSONException e) { - logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n)); + logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n), e); } } if (sync(coll, NetworkRoute.getAllNetworkRoutes())) { @@ -477,29 +488,11 @@ public class SynchronizerTask extends TimerTask { Syncable newobj = newmap.get(n); Syncable oldobj = oldmap.get(n); if (oldobj == null) { - if (logger.isDebugEnabled()) { - logger.debug(" Inserting record: " + newobj); - } - newobj.doInsert(conn); - changes = true; + changes = insertRecord(conn, newobj); } else if (newobj == null) { - if (logger.isDebugEnabled()) { - logger.debug(" Deleting record: " + oldobj); - } - oldobj.doDelete(conn); - changes = true; + changes = deleteRecord(conn, oldobj); } else if (!newobj.equals(oldobj)) { - if (logger.isDebugEnabled()) { - logger.debug(" Updating record: " + newobj); - } - newobj.doUpdate(conn); - - /**Rally US708115 - * Change Ownership of FEED - 1610, Syncronised with secondary DB. - * */ - checkChnageOwner(newobj, oldobj); - - changes = true; + changes = updateRecord(conn, newobj, oldobj); } } db.release(conn); @@ -509,6 +502,30 @@ public class SynchronizerTask extends TimerTask { return changes; } + private boolean updateRecord(Connection conn, Syncable newobj, Syncable oldobj) { + if (logger.isDebugEnabled()) { + logger.debug(" Updating record: " + newobj); + } + boolean changes = newobj.doUpdate(conn); + checkChangeOwner(newobj, oldobj); + + return changes; + } + + private boolean deleteRecord(Connection conn, Syncable oldobj) { + if (logger.isDebugEnabled()) { + logger.debug(" Deleting record: " + oldobj); + } + return oldobj.doDelete(conn); + } + + private boolean insertRecord(Connection conn, Syncable newobj) { + if (logger.isDebugEnabled()) { + logger.debug(" Inserting record: " + newobj); + } + return newobj.doInsert(conn); + } + private Map<String, Syncable> getMap(Collection<? extends Syncable> c) { Map<String, Syncable> map = new HashMap<>(); for (Syncable v : c) { @@ -517,18 +534,18 @@ public class SynchronizerTask extends TimerTask { return map; } - /**Change owner of FEED/SUBSCRIPTION*/ /** + * Change owner of FEED/SUBSCRIPTION. * Rally US708115 Change Ownership of FEED - 1610 */ - private void checkChnageOwner(Syncable newobj, Syncable oldobj) { + private void checkChangeOwner(Syncable newobj, Syncable oldobj) { if (newobj instanceof Feed) { Feed oldfeed = (Feed) oldobj; Feed newfeed = (Feed) newobj; if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) { - logger.info("PROV5013 - Previous publisher: " + oldfeed.getPublisher() + ": New publisher-" + newfeed - .getPublisher()); + logger.info("PROV5013 - Previous publisher: " + + oldfeed.getPublisher() + ": New publisher-" + newfeed.getPublisher()); oldfeed.setPublisher(newfeed.getPublisher()); oldfeed.changeOwnerShip(); } @@ -537,8 +554,8 @@ public class SynchronizerTask extends TimerTask { Subscription newsub = (Subscription) newobj; if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) { - logger.info("PROV5013 - Previous subscriber: " + oldsub.getSubscriber() + ": New subscriber-" + newsub - .getSubscriber()); + logger.info("PROV5013 - Previous subscriber: " + + oldsub.getSubscriber() + ": New subscriber-" + newsub.getSubscriber()); oldsub.setSubscriber(newsub.getSubscriber()); oldsub.changeOwnerShip(); } @@ -551,26 +568,26 @@ public class SynchronizerTask extends TimerTask { * * @return the provisioning data (as a JONObject) */ - private synchronized JSONObject readProvisioningJSON() { + private synchronized JSONObject readProvisioningJson() { String url = URLUtilities.generatePeerProvURL(); HttpGet get = new HttpGet(url); try { HttpResponse response = httpclient.execute(get); int code = response.getStatusLine().getStatusCode(); if (code != HttpServletResponse.SC_OK) { - logger.warn("PROV5010: readProvisioningJSON failed, bad error code: " + code); + logger.warn("PROV5010: readProvisioningJson failed, bad error code: " + code); return null; } HttpEntity entity = response.getEntity(); String ctype = entity.getContentType().getValue().trim(); - if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype - .equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) { - logger.warn("PROV5011: readProvisioningJSON failed, bad content type: " + ctype); + if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) + && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) { + logger.warn("PROV5011: readProvisioningJson failed, bad content type: " + ctype); return null; } return new JSONObject(new JSONTokener(entity.getContent())); } catch (Exception e) { - logger.warn("PROV5012: readProvisioningJSON failed, exception: " + e); + logger.warn("PROV5012: readProvisioningJson failed, exception: " + e); return null; } finally { get.releaseConnection(); @@ -583,7 +600,7 @@ public class SynchronizerTask extends TimerTask { * * @return the bitset */ - private RLEBitSet readRemoteLoglist() { + RLEBitSet readRemoteLoglist() { RLEBitSet bs = new RLEBitSet(); String url = URLUtilities.generatePeerLogsURL(); @@ -594,7 +611,7 @@ public class SynchronizerTask extends TimerTask { //End of fix. HttpGet get = new HttpGet(url); - try { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) { HttpResponse response = httpclient.execute(get); int code = response.getStatusLine().getStatusCode(); if (code != HttpServletResponse.SC_OK) { @@ -603,13 +620,12 @@ public class SynchronizerTask extends TimerTask { } HttpEntity entity = response.getEntity(); String ctype = entity.getContentType().getValue().trim(); - if (!"text/plain".equals(ctype)) { + if (!TEXT_CT.equals(ctype)) { logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype); return bs; } InputStream is = entity.getContent(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - int ch = 0; + int ch; while ((ch = is.read()) >= 0) { bos.write(ch); } @@ -630,12 +646,12 @@ public class SynchronizerTask extends TimerTask { * * @param bs the bitset (an RELBitSet) of log records to fetch */ - private void replicateDRLogs(RLEBitSet bs) { + void replicateDataRouterLogs(RLEBitSet bs) { String url = URLUtilities.generatePeerLogsURL(); HttpPost post = new HttpPost(url); try { String t = bs.toString(); - HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain")); + HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create(TEXT_CT)); post.setEntity(body); if (logger.isDebugEnabled()) { logger.debug("Requesting records: " + t); @@ -644,13 +660,13 @@ public class SynchronizerTask extends TimerTask { HttpResponse response = httpclient.execute(post); int code = response.getStatusLine().getStatusCode(); if (code != HttpServletResponse.SC_OK) { - logger.warn("PROV5010: replicateDRLogs failed, bad error code: " + code); + logger.warn("PROV5010: replicateDataRouterLogs failed, bad error code: " + code); return; } HttpEntity entity = response.getEntity(); String ctype = entity.getContentType().getValue().trim(); - if (!"text/plain".equals(ctype)) { - logger.warn("PROV5011: replicateDRLogs failed, bad content type: " + ctype); + if (!TEXT_CT.equals(ctype)) { + logger.warn("PROV5011: replicateDataRouterLogs failed, bad content type: " + ctype); return; } @@ -661,7 +677,7 @@ public class SynchronizerTask extends TimerTask { Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING); logger.info("Approximately " + bs.cardinality() + " records replicated."); } catch (Exception e) { - logger.warn("PROV5012: replicateDRLogs failed, exception: " + e); + logger.warn("PROV5012: replicateDataRouterLogs failed, exception: " + e); } finally { post.releaseConnection(); } diff --git a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/DrServletTestBase.java b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/DrServletTestBase.java index bad6e2cb..e2076b9d 100644 --- a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/DrServletTestBase.java +++ b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/DrServletTestBase.java @@ -50,7 +50,7 @@ public class DrServletTestBase { FieldUtils.writeDeclaredStaticField(DB.class, "props", props, true); FieldUtils.writeDeclaredStaticField(BaseServlet.class, "startmsgFlag", false, true); SynchronizerTask synchronizerTask = mock(SynchronizerTask.class); - when(synchronizerTask.getState()).thenReturn(SynchronizerTask.UNKNOWN); + when(synchronizerTask.getPodState()).thenReturn(SynchronizerTask.UNKNOWN_POD); FieldUtils.writeDeclaredStaticField(BaseServlet.class, "synctask", synchronizerTask, true); } diff --git a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTaskTest.java b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTaskTest.java new file mode 100755 index 00000000..79d83899 --- /dev/null +++ b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTaskTest.java @@ -0,0 +1,212 @@ +/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + +package org.onap.dmaap.datarouter.provisioning; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +import com.att.eelf.configuration.EELFManager; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.Persistence; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.http.HttpEntity; +import org.apache.http.StatusLine; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.impl.client.AbstractHttpClient; +import org.apache.http.message.BasicHeader; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader; +import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet; +import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(PowerMockRunner.class) +@PowerMockIgnore("javax.net.ssl.*") +@PrepareForTest({BaseServlet.class, URLUtilities.class}) +public class SynchronizerTaskTest { + + @Mock + private AbstractHttpClient httpClient; + + @Mock + private HttpEntity httpEntity; + + @Mock + private StatusLine statusLine; + + @Mock + private CloseableHttpResponse response; + + @Mock + private ByteArrayOutputStream byteArrayOutputStream; + + private SynchronizerTask synchronizerTask; + + private ExecutorService executorService; + + private static EntityManagerFactory emf; + private static EntityManager em; + + @BeforeClass + public static void init() { + emf = Persistence.createEntityManagerFactory("dr-unit-tests"); + em = emf.createEntityManager(); + System.setProperty( + "org.onap.dmaap.datarouter.provserver.properties", + "src/test/resources/h2Database.properties"); + } + + @AfterClass + public static void tearDownClass() { + em.clear(); + em.close(); + emf.close(); + } + + + @Before + public void setUp() throws IllegalAccessException, UnknownHostException { + SSLSocketFactory sslSocketFactory = mock(SSLSocketFactory.class); + doNothing().when(sslSocketFactory).setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); + + PowerMockito.mockStatic(BaseServlet.class); + PowerMockito.mockStatic(URLUtilities.class); + when(BaseServlet.getPods()).thenReturn(new String[] {InetAddress.getLocalHost().getHostName(), "stand-by-prov"}); + when(URLUtilities.generatePeerProvURL()).thenReturn("https://stand-by-prov/internal/prov"); + when(URLUtilities.generatePeerLogsURL()).thenReturn("https://stand-by-prov/internal/drlogs"); + + synchronizerTask = Mockito.spy(SynchronizerTask.getSynchronizer()); + doReturn(2).when(synchronizerTask).lookupState(); + + executorService = Executors.newSingleThreadExecutor(); + executorService.execute(synchronizerTask); + } + + @After + public void tearDown() throws InterruptedException { + executorService.shutdown(); + executorService.awaitTermination(2, TimeUnit.SECONDS); + } + + @Test + public void Given_Synch_Task_readRemoteLoglist_Called_And_Valid_BitSet_Returned_Success() throws Exception { + mockHttpClientForGetRequest(); + Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200); + Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "text/plain")); + Mockito.when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream("1-55251".getBytes())); + RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist(); + Assert.assertNotNull(rleBitSet); + } + + @Test + public void Given_Synch_Task_readRemoteLoglist_Called_And_Invalid_Resonse_Code_Failure() throws Exception { + mockHttpClientForGetRequest(); + Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(404); + RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist(); + Assert.assertNotNull(rleBitSet); + } + + @Test + public void Given_Synch_Task_readRemoteLoglist_Called_And_Invalid_Content_Type_Failure() throws Exception { + mockHttpClientForGetRequest(); + Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200); + Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "invalid_content_type")); + RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist(); + Assert.assertNotNull(rleBitSet); + } + + @Test + public void Given_Synch_Task_replicateDataRouterLogs_Called_And_Valid_BitSet_Returned_Success() throws Exception { + mockHttpClientForGetRequest(); + Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200); + Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "text/plain")); + RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist(); + synchronizerTask.replicateDataRouterLogs(rleBitSet); + } + + @Test + public void Given_Synch_Task_replicateDataRouterLogs_Called_And_Invalid_Content_Type_Failure() throws Exception { + mockHttpClientForGetRequest(); + Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200); + Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "invalid_content_type")); + RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist(); + synchronizerTask.replicateDataRouterLogs(rleBitSet); + } + + @Test + public void Given_Synch_Task_replicateDataRouterLogs_Called_And_Invalid_Resonse_Code_Failure() throws Exception { + mockHttpClientForGetRequest(); + Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(404); + RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist(); + synchronizerTask.replicateDataRouterLogs(rleBitSet); + } + + @Test + public void Given_Synch_Task_Is_Started_And_LogFileLoader_Is_Idle_Then_Standby_Pod_Synch_Is_Successful() throws Exception { + mockHttpClientForGetRequest(); + Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200); + Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "application/vnd.dmaap-dr.provfeed-full; version=1.0")); + mockResponseFromGet(); + } + + + private void mockHttpClientForGetRequest() throws Exception { + FieldUtils.writeField(synchronizerTask, "httpclient", httpClient, true); + Mockito.when(httpClient.execute(anyObject())).thenReturn(response); + Mockito.when(response.getEntity()).thenReturn(httpEntity); + Mockito.when(response.getStatusLine()).thenReturn(statusLine); + + } + + private void mockResponseFromGet() throws IOException { + InputStream in = getClass().getClassLoader().getResourceAsStream("prov_data.json"); + Mockito.when(httpEntity.getContent()).thenReturn(in); + } +} diff --git a/datarouter-prov/src/test/resources/create.sql b/datarouter-prov/src/test/resources/create.sql index 7c106723..a811847c 100755 --- a/datarouter-prov/src/test/resources/create.sql +++ b/datarouter-prov/src/test/resources/create.sql @@ -186,6 +186,9 @@ insert into INGRESS_ROUTES(SEQUENCE, FEEDID , USERID, SUBNET, NODESET) VALUES (2,1,'user',null,2); insert into NODESETS(SETID, NODEID) +VALUES (1,1); + +insert into NODESETS(SETID, NODEID) VALUES (2,2); insert into LOG_RECORDS(RECORD_ID,TYPE,EVENT_TIME,PUBLISH_ID,FEEDID,REQURI,METHOD,CONTENT_TYPE,CONTENT_LENGTH,FEED_FILEID,REMOTE_ADDR,USER,STATUS,DELIVERY_SUBID,DELIVERY_FILEID,RESULT,ATTEMPTS,REASON,FILENAME) diff --git a/datarouter-prov/src/test/resources/h2Database.properties b/datarouter-prov/src/test/resources/h2Database.properties index fee9c688..9c63aea4 100755 --- a/datarouter-prov/src/test/resources/h2Database.properties +++ b/datarouter-prov/src/test/resources/h2Database.properties @@ -29,4 +29,11 @@ org.onap.dmaap.datarouter.provserver.https.relaxation = false org.onap.dmaap.datarouter.provserver.accesslog.dir = unit-test-logs org.onap.dmaap.datarouter.provserver.spooldir = unit-test-logs/spool org.onap.dmaap.datarouter.provserver.localhost = 127.0.0.1 -org.onap.dmaap.datarouter.provserver.passwordencryption = PasswordEncryptionKey#@$%^&1234#
\ No newline at end of file +org.onap.dmaap.datarouter.provserver.passwordencryption = PasswordEncryptionKey#@$%^&1234# + +org.onap.dmaap.datarouter.provserver.keystore.type = jks +org.onap.dmaap.datarouter.provserver.keymanager.password = FZNkU,B%NJzcT1v7;^v]M#ZX +org.onap.dmaap.datarouter.provserver.keystore.path = aaf_certs/org.onap.dmaap-dr.jks +org.onap.dmaap.datarouter.provserver.keystore.password = FZNkU,B%NJzcT1v7;^v]M#ZX +org.onap.dmaap.datarouter.provserver.truststore.path = aaf_certs/org.onap.dmaap-dr.trust.jks +org.onap.dmaap.datarouter.provserver.truststore.password = +mzf@J.D^;3!![*Xr.z$c#?b
\ No newline at end of file diff --git a/datarouter-prov/src/test/resources/prov_data.json b/datarouter-prov/src/test/resources/prov_data.json new file mode 100644 index 00000000..32536316 --- /dev/null +++ b/datarouter-prov/src/test/resources/prov_data.json @@ -0,0 +1,129 @@ +{ + "feeds": [ + { + "suspend": false, + "groupid": 0, + "description": "Default feed provisioned for PM File collector", + "version": "m1.0", + "authorization": { + "endpoint_addrs": [ + + ], + "classification": "unclassified", + "endpoint_ids": [ + { + "password": "dradmin", + "id": "dradmin" + } + ] + }, + "last_mod": 1560871903000, + "deleted": false, + "feedid": 1, + "name": "Default PM Feed", + "business_description": "Default Feed", + "aaf_instance": "legacy", + "publisher": "dradmin", + "links": { + "subscribe": "https://dmaap-dr-prov/subscribe/1", + "log": "https://dmaap-dr-prov/feedlog/1", + "publish": "https://dmaap-dr-prov/publish/1", + "self": "https://dmaap-dr-prov/feed/1" + }, + "created_date": 1560871903000 + } + ], + "groups": [ + { + "authid": "GROUP-0000-c2754bb7-92ef-4869-9c6b-1bc1283be4c0", + "name": "Test Group", + "description": "Test Description of Group .", + "classification": "publisher/subscriber", + "members": "{id=attuid, name=User1}, {id=attuid, name=User 2]" + } + ], + "subscriptions": [ + { + "suspend": false, + "delivery": { + "use100": true, + "password": "PASSWORD", + "user": "LOGIN", + "url": "https://dcae-pm-mapper:8443/delivery" + }, + "subscriber": "dradmin", + "groupid": 0, + "metadataOnly": false, + "privilegedSubscriber": true, + "subid": 1, + "last_mod": 1560872889000, + "feedid": 1, + "follow_redirect": false, + "decompress": true, + "aaf_instance": "legacy", + "links": { + "feed": "https://dmaap-dr-prov/feed/1", + "log": "https://dmaap-dr-prov/sublog/1", + "self": "https://dmaap-dr-prov/subs/1" + }, + "created_date": 1560872889000 + } + ], + "parameters": { + "ACTIVE_POD": "dmaap-dr-prov", + "DELIVERY_FILE_PROCESS_INTERVAL": 10, + "DELIVERY_INIT_RETRY_INTERVAL": 10, + "DELIVERY_MAX_AGE": 86400, + "DELIVERY_MAX_RETRY_INTERVAL": 3600, + "DELIVERY_RETRY_RATIO": 2, + "LOGROLL_INTERVAL": 30, + "NODES": [ + "dmaap-dr-node" + ], + "PROV_ACTIVE_NAME": "dmaap-dr-prov", + "PROV_AUTH_ADDRESSES": [ + "dmaap-dr-prov", + "dmaap-dr-node" + ], + "PROV_AUTH_SUBJECTS": [ + "" + ], + "PROV_DOMAIN": "", + "PROV_MAXFEED_COUNT": 10000, + "PROV_MAXSUB_COUNT": 100000, + "PROV_NAME": "dmaap-dr-prov", + "PROV_REQUIRE_CERT": "false", + "PROV_REQUIRE_SECURE": "true", + "STANDBY_POD": "", + "_INT_VALUES": [ + "LOGROLL_INTERVAL", + "PROV_MAXFEED_COUNT", + "PROV_MAXSUB_COUNT", + "DELIVERY_INIT_RETRY_INTERVAL", + "DELIVERY_MAX_RETRY_INTERVAL", + "DELIVERY_RETRY_RATIO", + "DELIVERY_MAX_AGE", + "DELIVERY_FILE_PROCESS_INTERVAL" + ] + }, + "ingress": [ + { + "feedid": 1, + "subnet": "", + "user": "", + "node": [ + "stub_from." + ] + } + ], + "egress": { + "1": "stub_to." + }, + "routing": [ + { + "from": 1, + "to": 3, + "via": 2 + } + ] +}
\ No newline at end of file |