aboutsummaryrefslogtreecommitdiffstats
path: root/datarouter-prov/src
diff options
context:
space:
mode:
Diffstat (limited to 'datarouter-prov/src')
-rwxr-xr-xdatarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java6
-rw-r--r--datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/Poker.java4
-rwxr-xr-xdatarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/ProxyServlet.java2
-rw-r--r--datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java314
-rw-r--r--datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/DrServletTestBase.java2
-rwxr-xr-xdatarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTaskTest.java212
-rwxr-xr-xdatarouter-prov/src/test/resources/create.sql3
-rwxr-xr-xdatarouter-prov/src/test/resources/h2Database.properties9
-rw-r--r--datarouter-prov/src/test/resources/prov_data.json129
9 files changed, 524 insertions, 157 deletions
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java
index 7475b6b9..ef106ab4 100755
--- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java
+++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java
@@ -242,7 +242,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
private static String provName = "feeds-drtr.web.att.com";
/**
- * The standard FQDN of the ACTIVE provisioning server in this Data Router ecosystem
+ * The standard FQDN of the ACTIVE_POD provisioning server in this Data Router ecosystem
*/
private static String activeProvName = "feeds-drtr.web.att.com";
@@ -719,7 +719,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
}
/**
- * Gets the FQDN of the initially ACTIVE provisioning server (POD). Note: this used to be called isActivePOD(),
+ * Gets the FQDN of the initially ACTIVE_POD provisioning server (POD). Note: this used to be called isActivePOD(),
* however, that is a misnomer, as the active status could shift to the standby POD without these parameters
* changing. Hence, the function names have been changed to more accurately reflect their purpose.
*
@@ -730,7 +730,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
}
/**
- * Gets the FQDN of the initially STANDBY provisioning server (POD). Note: this used to be called isStandbyPOD(),
+ * Gets the FQDN of the initially STANDBY_POD provisioning server (POD). Note: this used to be called isStandbyPOD(),
* however, that is a misnomer, as the standby status could shift to the active POD without these parameters
* changing. Hence, the function names have been changed to more accurately reflect their purpose.
*
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/Poker.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/Poker.java
index 8afa18a1..6cb8520d 100644
--- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/Poker.java
+++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/Poker.java
@@ -86,8 +86,8 @@ public class Poker extends TimerTask {
try {
thisPod = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
- thisPod = "*UNKNOWN*"; // not a major problem
- logger.info("UnknownHostException: Setting thisPod to \"*UNKNOWN*\"", e);
+ thisPod = "*UNKNOWN_POD*"; // not a major problem
+ logger.info("UnknownHostException: Setting thisPod to \"*UNKNOWN_POD*\"", e);
}
provString = buildProvisioningString();
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/ProxyServlet.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/ProxyServlet.java
index 77917771..75423602 100755
--- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/ProxyServlet.java
+++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/ProxyServlet.java
@@ -146,7 +146,7 @@ public class ProxyServlet extends BaseServlet {
*/
public boolean isProxyServer() {
SynchronizerTask st = SynchronizerTask.getSynchronizer();
- return st.getState() == SynchronizerTask.STANDBY;
+ return st.getPodState() == SynchronizerTask.STANDBY_POD;
}
/**
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java
index d3ae4fe3..8c5a49a4 100644
--- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java
+++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTask.java
@@ -24,6 +24,11 @@
package org.onap.dmaap.datarouter.provisioning;
+import static org.onap.dmaap.datarouter.provisioning.BaseServlet.TEXT_CT;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -50,8 +55,6 @@ import java.util.TreeSet;
import javax.servlet.http.HttpServletResponse;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
@@ -85,7 +88,7 @@ import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
* <li>Checking DNS once per minute to see which POD the DNS CNAME points to. The CNAME will point to
* the active (master) POD.</li>
* <li>On non-master (standby) PODs, fetches provisioning data and logs in order to keep MariaDB in sync.</li>
- * <li>Providing information to other parts of the system as to the current role (ACTIVE, STANDBY, UNKNOWN)
+ * <li>Providing information to other parts of the system as to the current role (ACTIVE_POD, STANDBY_POD, UNKNOWN_POD)
* of this POD.</li>
* </ol>
* <p>For this to work correctly, the following code needs to be placed at the beginning of main().</p>
@@ -99,167 +102,152 @@ import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
public class SynchronizerTask extends TimerTask {
/**
- * This is a singleton -- there is only one SynchronizerTask object in the server
+ * This is a singleton -- there is only one SynchronizerTask object in the server.
*/
private static SynchronizerTask synctask;
/**
- * This POD is unknown -- not on the list of PODs
+ * This POD is unknown -- not on the list of PODs.
*/
- public static final int UNKNOWN = 0;
+ public static final int UNKNOWN_POD = 0;
/**
- * This POD is active -- on the list of PODs, and the DNS CNAME points to us
+ * This POD is active -- on the list of PODs, and the DNS CNAME points to us.
*/
- public static final int ACTIVE = 1;
+ public static final int ACTIVE_POD = 1;
/**
- * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us
+ * This POD is standby -- on the list of PODs, and the DNS CNAME does not point to us.
*/
- public static final int STANDBY = 2;
- private static final String[] stnames = {"UNKNOWN", "ACTIVE", "STANDBY"};
+ public static final int STANDBY_POD = 2;
+
+ private static final String[] stnames = {"UNKNOWN_POD", "ACTIVE_POD", "STANDBY_POD"};
private static final long ONE_HOUR = 60 * 60 * 1000L;
+ private long nextMsg = 0; // only display the "Current podState" msg every 5 mins.
+
private final EELFLogger logger;
private final Timer rolex;
private final String spooldir;
- private int state;
+ private int podState;
private boolean doFetch;
private long nextsynctime;
private AbstractHttpClient httpclient = null;
- /**
- * Get the singleton SynchronizerTask object.
- *
- * @return the SynchronizerTask
- */
- public static synchronized SynchronizerTask getSynchronizer() {
- if (synctask == null) {
- synctask = new SynchronizerTask();
- }
- return synctask;
- }
-
@SuppressWarnings("deprecation")
private SynchronizerTask() {
logger = EELFManager.getInstance().getLogger("InternalLog");
rolex = new Timer();
spooldir = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
- state = UNKNOWN;
+ podState = UNKNOWN_POD;
doFetch = true; // start off with a fetch
nextsynctime = 0;
- logger.info("PROV5000: Sync task starting, server state is UNKNOWN");
+ logger.info("PROV5000: Sync task starting, server podState is UNKNOWN_POD");
try {
Properties props = (new DB()).getProperties();
String type = props.getProperty(Main.KEYSTORE_TYPE_PROPERTY, "jks");
String store = props.getProperty(Main.KEYSTORE_PATH_PROPERTY);
String pass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
KeyStore keyStore = KeyStore.getInstance(type);
- try(FileInputStream instream = new FileInputStream(new File(store))) {
+ try (FileInputStream instream = new FileInputStream(new File(store))) {
keyStore.load(instream, pass.toCharArray());
}
- store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
- pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY);
- KeyStore trustStore = null;
- if (store != null && store.length() > 0) {
- trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
- try(FileInputStream instream = new FileInputStream(new File(store))){
- trustStore.load(instream, pass.toCharArray());
+ store = props.getProperty(Main.TRUSTSTORE_PATH_PROPERTY);
+ pass = props.getProperty(Main.TRUSTSTORE_PASS_PROPERTY);
+ KeyStore trustStore = null;
+ if (store != null && store.length() > 0) {
+ trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
+ try (FileInputStream instream = new FileInputStream(new File(store))) {
+ trustStore.load(instream, pass.toCharArray());
- }
}
+ }
// We are connecting with the node name, but the certificate will have the CNAME
// So we need to accept a non-matching certificate name
- String keystorepass = props.getProperty(
- Main.KEYSTORE_PASS_PROPERTY); //itrack.web.att.com/browse/DATARTR-6 for changing hard coded passphase ref
- try(AbstractHttpClient hc = new DefaultHttpClient()) {
- SSLSocketFactory socketFactory =
- (trustStore == null)
- ? new SSLSocketFactory(keyStore, keystorepass)
- : new SSLSocketFactory(keyStore, keystorepass, trustStore);
- socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
- Scheme sch = new Scheme("https", 443, socketFactory);
- hc.getConnectionManager().getSchemeRegistry().register(sch);
- httpclient = hc;
- }
- // Run once every 5 seconds to check DNS, etc.
- long interval = 0;
- try {
- String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
- interval = Long.parseLong(s);
- } catch (NumberFormatException e) {
- interval = 5000L;
+ String keystorepass = props.getProperty(Main.KEYSTORE_PASS_PROPERTY);
+ try (AbstractHttpClient hc = new DefaultHttpClient()) {
+ SSLSocketFactory socketFactory =
+ (trustStore == null)
+ ? new SSLSocketFactory(keyStore, keystorepass)
+ : new SSLSocketFactory(keyStore, keystorepass, trustStore);
+ socketFactory.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+ Scheme sch = new Scheme("https", 443, socketFactory);
+ hc.getConnectionManager().getSchemeRegistry().register(sch);
+ httpclient = hc;
}
- rolex.scheduleAtFixedRate(this, 0L, interval);
+ setSynchTimer(props);
} catch (Exception e) {
logger.warn("PROV5005: Problem starting the synchronizer: " + e);
}
}
+ private void setSynchTimer(Properties props) {
+ // Run once every 5 seconds to check DNS, etc.
+ long interval;
+ try {
+ String s = props.getProperty("org.onap.dmaap.datarouter.provserver.sync_interval", "5000");
+ interval = Long.parseLong(s);
+ } catch (NumberFormatException e) {
+ interval = 5000L;
+ }
+ rolex.scheduleAtFixedRate(this, 0L, interval);
+ }
+
+ /**
+ * Get the singleton SynchronizerTask object.
+ *
+ * @return the SynchronizerTask
+ */
+ public static synchronized SynchronizerTask getSynchronizer() {
+ if (synctask == null) {
+ synctask = new SynchronizerTask();
+ }
+ return synctask;
+ }
+
/**
- * What is the state of this POD?
+ * What is the podState of this POD?.
*
- * @return one of ACTIVE, STANDBY, UNKNOWN
+ * @return one of ACTIVE_POD, STANDBY_POD, UNKNOWN_POD
*/
- public int getState() {
- return state;
+ public int getPodState() {
+ return podState;
}
/**
- * Is this the active POD?
+ * Is this the active POD?.
*
* @return true if we are active (the master), false otherwise
*/
public boolean isActive() {
- return state == ACTIVE;
+ return podState == ACTIVE_POD;
}
/**
* This method is used to signal that another POD (the active POD) has sent us a /fetchProv request, and that we
* should re-synchronize with the master.
*/
- public void doFetch() {
+ void doFetch() {
doFetch = true;
}
/**
* Runs once a minute in order to <ol>
* <li>lookup DNS names,</li>
- * <li>determine the state of this POD,</li>
- * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of state from the active POD.</li>
+ * <li>determine the podState of this POD,</li>
+ * <li>if this is a standby POD, and the fetch flag is set, perform a fetch of podState from the active POD.</li>
* <li>if this is a standby POD, check if there are any new log records to be replicated.</li>
- * </ol>
+ * </ol>.
*/
@Override
public void run() {
try {
- state = lookupState();
- if (state == STANDBY) {
+ podState = lookupState();
+ if (podState == STANDBY_POD) {
// Only copy provisioning data FROM the active server TO the standby
if (doFetch || (System.currentTimeMillis() >= nextsynctime)) {
- logger.debug("Initiating a sync...");
- JSONObject jo = readProvisioningJSON();
- if (jo != null) {
- doFetch = false;
- syncFeeds(jo.getJSONArray("feeds"));
- syncSubs(jo.getJSONArray("subscriptions"));
- syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
- syncParams(jo.getJSONObject("parameters"));
- // The following will not be present in a version=1.0 provfeed
- JSONArray ja = jo.optJSONArray("ingress");
- if (ja != null) {
- syncIngressRoutes(ja);
- }
- JSONObject j2 = jo.optJSONObject("egress");
- if (j2 != null) {
- syncEgressRoutes(j2);
- }
- ja = jo.optJSONArray("routing");
- if (ja != null) {
- syncNetworkRoutes(ja);
- }
- }
+ syncProvisioningData();
logger.info("PROV5013: Sync completed.");
nextsynctime = System.currentTimeMillis() + ONE_HOUR;
}
@@ -278,7 +266,7 @@ public class SynchronizerTask extends TimerTask {
remote.andNot(local);
if (!remote.isEmpty()) {
logger.debug(" Replicating logs: " + remote);
- replicateDRLogs(remote);
+ replicateDataRouterLogs(remote);
}
}
} catch (Exception e) {
@@ -286,14 +274,39 @@ public class SynchronizerTask extends TimerTask {
}
}
+ private void syncProvisioningData() {
+ logger.debug("Initiating a sync...");
+ JSONObject jo = readProvisioningJson();
+ if (jo != null) {
+ doFetch = false;
+ syncFeeds(jo.getJSONArray("feeds"));
+ syncSubs(jo.getJSONArray("subscriptions"));
+ syncGroups(jo.getJSONArray("groups")); //Rally:US708115 - 1610
+ syncParams(jo.getJSONObject("parameters"));
+ // The following will not be present in a version=1.0 provfeed
+ JSONArray ja = jo.optJSONArray("ingress");
+ if (ja != null) {
+ syncIngressRoutes(ja);
+ }
+ JSONObject j2 = jo.optJSONObject("egress");
+ if (j2 != null) {
+ syncEgressRoutes(j2);
+ }
+ ja = jo.optJSONArray("routing");
+ if (ja != null) {
+ syncNetworkRoutes(ja);
+ }
+ }
+ }
+
/**
- * This method is used to lookup the CNAME that points to the active server. It returns 0 (UNKNOWN), 1(ACTIVE), or 2
- * (STANDBY) to indicate the state of this server.
+ * This method is used to lookup the CNAME that points to the active server.
+ * It returns 0 (UNKNOWN_POD), 1(ACTIVE_POD), or (STANDBY_POD) to indicate the podState of this server.
*
- * @return the current state
+ * @return the current podState
*/
- private int lookupState() {
- int newstate = UNKNOWN;
+ int lookupState() {
+ int newPodState = UNKNOWN_POD;
try {
InetAddress myaddr = InetAddress.getLocalHost();
if (logger.isTraceEnabled()) {
@@ -303,9 +316,9 @@ public class SynchronizerTask extends TimerTask {
Set<String> pods = new TreeSet<>(Arrays.asList(BaseServlet.getPods()));
if (pods.contains(thisPod)) {
InetAddress pserver = InetAddress.getByName(BaseServlet.getActiveProvName());
- newstate = myaddr.equals(pserver) ? ACTIVE : STANDBY;
+ newPodState = myaddr.equals(pserver) ? ACTIVE_POD : STANDBY_POD;
if (logger.isDebugEnabled() && System.currentTimeMillis() >= nextMsg) {
- logger.debug("Active POD = " + pserver + ", Current state is " + stnames[newstate]);
+ logger.debug("Active POD = " + pserver + ", Current podState is " + stnames[newPodState]);
nextMsg = System.currentTimeMillis() + (5 * 60 * 1000L);
}
} else {
@@ -315,15 +328,13 @@ public class SynchronizerTask extends TimerTask {
logger.warn("PROV5002: Cannot determine the name of this provisioning server.", e);
}
- if (newstate != state) {
- logger
- .info(String.format("PROV5001: Server state changed from %s to %s", stnames[state], stnames[newstate]));
+ if (newPodState != podState) {
+ logger.info(String.format("PROV5001: Server podState changed from %s to %s",
+ stnames[podState], stnames[newPodState]));
}
- return newstate;
+ return newPodState;
}
- private static long nextMsg = 0; // only display the "Current state" msg every 5 mins.
-
/**
* Synchronize the Feeds in the JSONArray, with the Feeds in the DB.
*/
@@ -396,7 +407,7 @@ public class SynchronizerTask extends TimerTask {
try {
v = "" + jo.getInt(k);
} catch (JSONException e1) {
- logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e);
+ logger.warn("PROV5004: Invalid object in parameters: " + jo.optInt(k), e1);
JSONArray ja = jo.getJSONArray(k);
for (int i = 0; i < ja.length(); i++) {
if (i > 0) {
@@ -455,7 +466,7 @@ public class SynchronizerTask extends TimerTask {
NetworkRoute nr = new NetworkRoute(ja.getJSONObject(n));
coll.add(nr);
} catch (JSONException e) {
- logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n));
+ logger.warn("PROV5004: Invalid object in network routes: " + ja.optJSONObject(n), e);
}
}
if (sync(coll, NetworkRoute.getAllNetworkRoutes())) {
@@ -477,29 +488,11 @@ public class SynchronizerTask extends TimerTask {
Syncable newobj = newmap.get(n);
Syncable oldobj = oldmap.get(n);
if (oldobj == null) {
- if (logger.isDebugEnabled()) {
- logger.debug(" Inserting record: " + newobj);
- }
- newobj.doInsert(conn);
- changes = true;
+ changes = insertRecord(conn, newobj);
} else if (newobj == null) {
- if (logger.isDebugEnabled()) {
- logger.debug(" Deleting record: " + oldobj);
- }
- oldobj.doDelete(conn);
- changes = true;
+ changes = deleteRecord(conn, oldobj);
} else if (!newobj.equals(oldobj)) {
- if (logger.isDebugEnabled()) {
- logger.debug(" Updating record: " + newobj);
- }
- newobj.doUpdate(conn);
-
- /**Rally US708115
- * Change Ownership of FEED - 1610, Syncronised with secondary DB.
- * */
- checkChnageOwner(newobj, oldobj);
-
- changes = true;
+ changes = updateRecord(conn, newobj, oldobj);
}
}
db.release(conn);
@@ -509,6 +502,30 @@ public class SynchronizerTask extends TimerTask {
return changes;
}
+ private boolean updateRecord(Connection conn, Syncable newobj, Syncable oldobj) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(" Updating record: " + newobj);
+ }
+ boolean changes = newobj.doUpdate(conn);
+ checkChangeOwner(newobj, oldobj);
+
+ return changes;
+ }
+
+ private boolean deleteRecord(Connection conn, Syncable oldobj) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(" Deleting record: " + oldobj);
+ }
+ return oldobj.doDelete(conn);
+ }
+
+ private boolean insertRecord(Connection conn, Syncable newobj) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(" Inserting record: " + newobj);
+ }
+ return newobj.doInsert(conn);
+ }
+
private Map<String, Syncable> getMap(Collection<? extends Syncable> c) {
Map<String, Syncable> map = new HashMap<>();
for (Syncable v : c) {
@@ -517,18 +534,18 @@ public class SynchronizerTask extends TimerTask {
return map;
}
- /**Change owner of FEED/SUBSCRIPTION*/
/**
+ * Change owner of FEED/SUBSCRIPTION.
* Rally US708115 Change Ownership of FEED - 1610
*/
- private void checkChnageOwner(Syncable newobj, Syncable oldobj) {
+ private void checkChangeOwner(Syncable newobj, Syncable oldobj) {
if (newobj instanceof Feed) {
Feed oldfeed = (Feed) oldobj;
Feed newfeed = (Feed) newobj;
if (!oldfeed.getPublisher().equals(newfeed.getPublisher())) {
- logger.info("PROV5013 - Previous publisher: " + oldfeed.getPublisher() + ": New publisher-" + newfeed
- .getPublisher());
+ logger.info("PROV5013 - Previous publisher: "
+ + oldfeed.getPublisher() + ": New publisher-" + newfeed.getPublisher());
oldfeed.setPublisher(newfeed.getPublisher());
oldfeed.changeOwnerShip();
}
@@ -537,8 +554,8 @@ public class SynchronizerTask extends TimerTask {
Subscription newsub = (Subscription) newobj;
if (!oldsub.getSubscriber().equals(newsub.getSubscriber())) {
- logger.info("PROV5013 - Previous subscriber: " + oldsub.getSubscriber() + ": New subscriber-" + newsub
- .getSubscriber());
+ logger.info("PROV5013 - Previous subscriber: "
+ + oldsub.getSubscriber() + ": New subscriber-" + newsub.getSubscriber());
oldsub.setSubscriber(newsub.getSubscriber());
oldsub.changeOwnerShip();
}
@@ -551,26 +568,26 @@ public class SynchronizerTask extends TimerTask {
*
* @return the provisioning data (as a JONObject)
*/
- private synchronized JSONObject readProvisioningJSON() {
+ private synchronized JSONObject readProvisioningJson() {
String url = URLUtilities.generatePeerProvURL();
HttpGet get = new HttpGet(url);
try {
HttpResponse response = httpclient.execute(get);
int code = response.getStatusLine().getStatusCode();
if (code != HttpServletResponse.SC_OK) {
- logger.warn("PROV5010: readProvisioningJSON failed, bad error code: " + code);
+ logger.warn("PROV5010: readProvisioningJson failed, bad error code: " + code);
return null;
}
HttpEntity entity = response.getEntity();
String ctype = entity.getContentType().getValue().trim();
- if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1) && !ctype
- .equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
- logger.warn("PROV5011: readProvisioningJSON failed, bad content type: " + ctype);
+ if (!ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE1)
+ && !ctype.equals(BaseServlet.PROVFULL_CONTENT_TYPE2)) {
+ logger.warn("PROV5011: readProvisioningJson failed, bad content type: " + ctype);
return null;
}
return new JSONObject(new JSONTokener(entity.getContent()));
} catch (Exception e) {
- logger.warn("PROV5012: readProvisioningJSON failed, exception: " + e);
+ logger.warn("PROV5012: readProvisioningJson failed, exception: " + e);
return null;
} finally {
get.releaseConnection();
@@ -583,7 +600,7 @@ public class SynchronizerTask extends TimerTask {
*
* @return the bitset
*/
- private RLEBitSet readRemoteLoglist() {
+ RLEBitSet readRemoteLoglist() {
RLEBitSet bs = new RLEBitSet();
String url = URLUtilities.generatePeerLogsURL();
@@ -594,7 +611,7 @@ public class SynchronizerTask extends TimerTask {
//End of fix.
HttpGet get = new HttpGet(url);
- try {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
HttpResponse response = httpclient.execute(get);
int code = response.getStatusLine().getStatusCode();
if (code != HttpServletResponse.SC_OK) {
@@ -603,13 +620,12 @@ public class SynchronizerTask extends TimerTask {
}
HttpEntity entity = response.getEntity();
String ctype = entity.getContentType().getValue().trim();
- if (!"text/plain".equals(ctype)) {
+ if (!TEXT_CT.equals(ctype)) {
logger.warn("PROV5011: readRemoteLoglist failed, bad content type: " + ctype);
return bs;
}
InputStream is = entity.getContent();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- int ch = 0;
+ int ch;
while ((ch = is.read()) >= 0) {
bos.write(ch);
}
@@ -630,12 +646,12 @@ public class SynchronizerTask extends TimerTask {
*
* @param bs the bitset (an RELBitSet) of log records to fetch
*/
- private void replicateDRLogs(RLEBitSet bs) {
+ void replicateDataRouterLogs(RLEBitSet bs) {
String url = URLUtilities.generatePeerLogsURL();
HttpPost post = new HttpPost(url);
try {
String t = bs.toString();
- HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create("text/plain"));
+ HttpEntity body = new ByteArrayEntity(t.getBytes(), ContentType.create(TEXT_CT));
post.setEntity(body);
if (logger.isDebugEnabled()) {
logger.debug("Requesting records: " + t);
@@ -644,13 +660,13 @@ public class SynchronizerTask extends TimerTask {
HttpResponse response = httpclient.execute(post);
int code = response.getStatusLine().getStatusCode();
if (code != HttpServletResponse.SC_OK) {
- logger.warn("PROV5010: replicateDRLogs failed, bad error code: " + code);
+ logger.warn("PROV5010: replicateDataRouterLogs failed, bad error code: " + code);
return;
}
HttpEntity entity = response.getEntity();
String ctype = entity.getContentType().getValue().trim();
- if (!"text/plain".equals(ctype)) {
- logger.warn("PROV5011: replicateDRLogs failed, bad content type: " + ctype);
+ if (!TEXT_CT.equals(ctype)) {
+ logger.warn("PROV5011: replicateDataRouterLogs failed, bad content type: " + ctype);
return;
}
@@ -661,7 +677,7 @@ public class SynchronizerTask extends TimerTask {
Files.move(tmppath, donepath, StandardCopyOption.REPLACE_EXISTING);
logger.info("Approximately " + bs.cardinality() + " records replicated.");
} catch (Exception e) {
- logger.warn("PROV5012: replicateDRLogs failed, exception: " + e);
+ logger.warn("PROV5012: replicateDataRouterLogs failed, exception: " + e);
} finally {
post.releaseConnection();
}
diff --git a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/DrServletTestBase.java b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/DrServletTestBase.java
index bad6e2cb..e2076b9d 100644
--- a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/DrServletTestBase.java
+++ b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/DrServletTestBase.java
@@ -50,7 +50,7 @@ public class DrServletTestBase {
FieldUtils.writeDeclaredStaticField(DB.class, "props", props, true);
FieldUtils.writeDeclaredStaticField(BaseServlet.class, "startmsgFlag", false, true);
SynchronizerTask synchronizerTask = mock(SynchronizerTask.class);
- when(synchronizerTask.getState()).thenReturn(SynchronizerTask.UNKNOWN);
+ when(synchronizerTask.getPodState()).thenReturn(SynchronizerTask.UNKNOWN_POD);
FieldUtils.writeDeclaredStaticField(BaseServlet.class, "synctask", synchronizerTask, true);
}
diff --git a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTaskTest.java b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTaskTest.java
new file mode 100755
index 00000000..79d83899
--- /dev/null
+++ b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/SynchronizerTaskTest.java
@@ -0,0 +1,212 @@
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+package org.onap.dmaap.datarouter.provisioning;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import com.att.eelf.configuration.EELFManager;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.Persistence;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.conn.ssl.SSLSocketFactory;
+import org.apache.http.impl.client.AbstractHttpClient;
+import org.apache.http.message.BasicHeader;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.onap.dmaap.datarouter.provisioning.utils.LogfileLoader;
+import org.onap.dmaap.datarouter.provisioning.utils.RLEBitSet;
+import org.onap.dmaap.datarouter.provisioning.utils.URLUtilities;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore("javax.net.ssl.*")
+@PrepareForTest({BaseServlet.class, URLUtilities.class})
+public class SynchronizerTaskTest {
+
+ @Mock
+ private AbstractHttpClient httpClient;
+
+ @Mock
+ private HttpEntity httpEntity;
+
+ @Mock
+ private StatusLine statusLine;
+
+ @Mock
+ private CloseableHttpResponse response;
+
+ @Mock
+ private ByteArrayOutputStream byteArrayOutputStream;
+
+ private SynchronizerTask synchronizerTask;
+
+ private ExecutorService executorService;
+
+ private static EntityManagerFactory emf;
+ private static EntityManager em;
+
+ @BeforeClass
+ public static void init() {
+ emf = Persistence.createEntityManagerFactory("dr-unit-tests");
+ em = emf.createEntityManager();
+ System.setProperty(
+ "org.onap.dmaap.datarouter.provserver.properties",
+ "src/test/resources/h2Database.properties");
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ em.clear();
+ em.close();
+ emf.close();
+ }
+
+
+ @Before
+ public void setUp() throws IllegalAccessException, UnknownHostException {
+ SSLSocketFactory sslSocketFactory = mock(SSLSocketFactory.class);
+ doNothing().when(sslSocketFactory).setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+
+ PowerMockito.mockStatic(BaseServlet.class);
+ PowerMockito.mockStatic(URLUtilities.class);
+ when(BaseServlet.getPods()).thenReturn(new String[] {InetAddress.getLocalHost().getHostName(), "stand-by-prov"});
+ when(URLUtilities.generatePeerProvURL()).thenReturn("https://stand-by-prov/internal/prov");
+ when(URLUtilities.generatePeerLogsURL()).thenReturn("https://stand-by-prov/internal/drlogs");
+
+ synchronizerTask = Mockito.spy(SynchronizerTask.getSynchronizer());
+ doReturn(2).when(synchronizerTask).lookupState();
+
+ executorService = Executors.newSingleThreadExecutor();
+ executorService.execute(synchronizerTask);
+ }
+
+ @After
+ public void tearDown() throws InterruptedException {
+ executorService.shutdown();
+ executorService.awaitTermination(2, TimeUnit.SECONDS);
+ }
+
+ @Test
+ public void Given_Synch_Task_readRemoteLoglist_Called_And_Valid_BitSet_Returned_Success() throws Exception {
+ mockHttpClientForGetRequest();
+ Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200);
+ Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "text/plain"));
+ Mockito.when(httpEntity.getContent()).thenReturn(new ByteArrayInputStream("1-55251".getBytes()));
+ RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist();
+ Assert.assertNotNull(rleBitSet);
+ }
+
+ @Test
+ public void Given_Synch_Task_readRemoteLoglist_Called_And_Invalid_Resonse_Code_Failure() throws Exception {
+ mockHttpClientForGetRequest();
+ Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(404);
+ RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist();
+ Assert.assertNotNull(rleBitSet);
+ }
+
+ @Test
+ public void Given_Synch_Task_readRemoteLoglist_Called_And_Invalid_Content_Type_Failure() throws Exception {
+ mockHttpClientForGetRequest();
+ Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200);
+ Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "invalid_content_type"));
+ RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist();
+ Assert.assertNotNull(rleBitSet);
+ }
+
+ @Test
+ public void Given_Synch_Task_replicateDataRouterLogs_Called_And_Valid_BitSet_Returned_Success() throws Exception {
+ mockHttpClientForGetRequest();
+ Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200);
+ Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "text/plain"));
+ RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist();
+ synchronizerTask.replicateDataRouterLogs(rleBitSet);
+ }
+
+ @Test
+ public void Given_Synch_Task_replicateDataRouterLogs_Called_And_Invalid_Content_Type_Failure() throws Exception {
+ mockHttpClientForGetRequest();
+ Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200);
+ Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "invalid_content_type"));
+ RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist();
+ synchronizerTask.replicateDataRouterLogs(rleBitSet);
+ }
+
+ @Test
+ public void Given_Synch_Task_replicateDataRouterLogs_Called_And_Invalid_Resonse_Code_Failure() throws Exception {
+ mockHttpClientForGetRequest();
+ Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(404);
+ RLEBitSet rleBitSet = synchronizerTask.readRemoteLoglist();
+ synchronizerTask.replicateDataRouterLogs(rleBitSet);
+ }
+
+ @Test
+ public void Given_Synch_Task_Is_Started_And_LogFileLoader_Is_Idle_Then_Standby_Pod_Synch_Is_Successful() throws Exception {
+ mockHttpClientForGetRequest();
+ Mockito.when(response.getStatusLine().getStatusCode()).thenReturn(200);
+ Mockito.when(httpEntity.getContentType()).thenReturn(new BasicHeader("header", "application/vnd.dmaap-dr.provfeed-full; version=1.0"));
+ mockResponseFromGet();
+ }
+
+
+ private void mockHttpClientForGetRequest() throws Exception {
+ FieldUtils.writeField(synchronizerTask, "httpclient", httpClient, true);
+ Mockito.when(httpClient.execute(anyObject())).thenReturn(response);
+ Mockito.when(response.getEntity()).thenReturn(httpEntity);
+ Mockito.when(response.getStatusLine()).thenReturn(statusLine);
+
+ }
+
+ private void mockResponseFromGet() throws IOException {
+ InputStream in = getClass().getClassLoader().getResourceAsStream("prov_data.json");
+ Mockito.when(httpEntity.getContent()).thenReturn(in);
+ }
+}
diff --git a/datarouter-prov/src/test/resources/create.sql b/datarouter-prov/src/test/resources/create.sql
index 7c106723..a811847c 100755
--- a/datarouter-prov/src/test/resources/create.sql
+++ b/datarouter-prov/src/test/resources/create.sql
@@ -186,6 +186,9 @@ insert into INGRESS_ROUTES(SEQUENCE, FEEDID , USERID, SUBNET, NODESET)
VALUES (2,1,'user',null,2);
insert into NODESETS(SETID, NODEID)
+VALUES (1,1);
+
+insert into NODESETS(SETID, NODEID)
VALUES (2,2);
insert into LOG_RECORDS(RECORD_ID,TYPE,EVENT_TIME,PUBLISH_ID,FEEDID,REQURI,METHOD,CONTENT_TYPE,CONTENT_LENGTH,FEED_FILEID,REMOTE_ADDR,USER,STATUS,DELIVERY_SUBID,DELIVERY_FILEID,RESULT,ATTEMPTS,REASON,FILENAME)
diff --git a/datarouter-prov/src/test/resources/h2Database.properties b/datarouter-prov/src/test/resources/h2Database.properties
index fee9c688..9c63aea4 100755
--- a/datarouter-prov/src/test/resources/h2Database.properties
+++ b/datarouter-prov/src/test/resources/h2Database.properties
@@ -29,4 +29,11 @@ org.onap.dmaap.datarouter.provserver.https.relaxation = false
org.onap.dmaap.datarouter.provserver.accesslog.dir = unit-test-logs
org.onap.dmaap.datarouter.provserver.spooldir = unit-test-logs/spool
org.onap.dmaap.datarouter.provserver.localhost = 127.0.0.1
-org.onap.dmaap.datarouter.provserver.passwordencryption = PasswordEncryptionKey#@$%^&1234# \ No newline at end of file
+org.onap.dmaap.datarouter.provserver.passwordencryption = PasswordEncryptionKey#@$%^&1234#
+
+org.onap.dmaap.datarouter.provserver.keystore.type = jks
+org.onap.dmaap.datarouter.provserver.keymanager.password = FZNkU,B%NJzcT1v7;^v]M#ZX
+org.onap.dmaap.datarouter.provserver.keystore.path = aaf_certs/org.onap.dmaap-dr.jks
+org.onap.dmaap.datarouter.provserver.keystore.password = FZNkU,B%NJzcT1v7;^v]M#ZX
+org.onap.dmaap.datarouter.provserver.truststore.path = aaf_certs/org.onap.dmaap-dr.trust.jks
+org.onap.dmaap.datarouter.provserver.truststore.password = +mzf@J.D^;3!![*Xr.z$c#?b \ No newline at end of file
diff --git a/datarouter-prov/src/test/resources/prov_data.json b/datarouter-prov/src/test/resources/prov_data.json
new file mode 100644
index 00000000..32536316
--- /dev/null
+++ b/datarouter-prov/src/test/resources/prov_data.json
@@ -0,0 +1,129 @@
+{
+ "feeds": [
+ {
+ "suspend": false,
+ "groupid": 0,
+ "description": "Default feed provisioned for PM File collector",
+ "version": "m1.0",
+ "authorization": {
+ "endpoint_addrs": [
+
+ ],
+ "classification": "unclassified",
+ "endpoint_ids": [
+ {
+ "password": "dradmin",
+ "id": "dradmin"
+ }
+ ]
+ },
+ "last_mod": 1560871903000,
+ "deleted": false,
+ "feedid": 1,
+ "name": "Default PM Feed",
+ "business_description": "Default Feed",
+ "aaf_instance": "legacy",
+ "publisher": "dradmin",
+ "links": {
+ "subscribe": "https://dmaap-dr-prov/subscribe/1",
+ "log": "https://dmaap-dr-prov/feedlog/1",
+ "publish": "https://dmaap-dr-prov/publish/1",
+ "self": "https://dmaap-dr-prov/feed/1"
+ },
+ "created_date": 1560871903000
+ }
+ ],
+ "groups": [
+ {
+ "authid": "GROUP-0000-c2754bb7-92ef-4869-9c6b-1bc1283be4c0",
+ "name": "Test Group",
+ "description": "Test Description of Group .",
+ "classification": "publisher/subscriber",
+ "members": "{id=attuid, name=User1}, {id=attuid, name=User 2]"
+ }
+ ],
+ "subscriptions": [
+ {
+ "suspend": false,
+ "delivery": {
+ "use100": true,
+ "password": "PASSWORD",
+ "user": "LOGIN",
+ "url": "https://dcae-pm-mapper:8443/delivery"
+ },
+ "subscriber": "dradmin",
+ "groupid": 0,
+ "metadataOnly": false,
+ "privilegedSubscriber": true,
+ "subid": 1,
+ "last_mod": 1560872889000,
+ "feedid": 1,
+ "follow_redirect": false,
+ "decompress": true,
+ "aaf_instance": "legacy",
+ "links": {
+ "feed": "https://dmaap-dr-prov/feed/1",
+ "log": "https://dmaap-dr-prov/sublog/1",
+ "self": "https://dmaap-dr-prov/subs/1"
+ },
+ "created_date": 1560872889000
+ }
+ ],
+ "parameters": {
+ "ACTIVE_POD": "dmaap-dr-prov",
+ "DELIVERY_FILE_PROCESS_INTERVAL": 10,
+ "DELIVERY_INIT_RETRY_INTERVAL": 10,
+ "DELIVERY_MAX_AGE": 86400,
+ "DELIVERY_MAX_RETRY_INTERVAL": 3600,
+ "DELIVERY_RETRY_RATIO": 2,
+ "LOGROLL_INTERVAL": 30,
+ "NODES": [
+ "dmaap-dr-node"
+ ],
+ "PROV_ACTIVE_NAME": "dmaap-dr-prov",
+ "PROV_AUTH_ADDRESSES": [
+ "dmaap-dr-prov",
+ "dmaap-dr-node"
+ ],
+ "PROV_AUTH_SUBJECTS": [
+ ""
+ ],
+ "PROV_DOMAIN": "",
+ "PROV_MAXFEED_COUNT": 10000,
+ "PROV_MAXSUB_COUNT": 100000,
+ "PROV_NAME": "dmaap-dr-prov",
+ "PROV_REQUIRE_CERT": "false",
+ "PROV_REQUIRE_SECURE": "true",
+ "STANDBY_POD": "",
+ "_INT_VALUES": [
+ "LOGROLL_INTERVAL",
+ "PROV_MAXFEED_COUNT",
+ "PROV_MAXSUB_COUNT",
+ "DELIVERY_INIT_RETRY_INTERVAL",
+ "DELIVERY_MAX_RETRY_INTERVAL",
+ "DELIVERY_RETRY_RATIO",
+ "DELIVERY_MAX_AGE",
+ "DELIVERY_FILE_PROCESS_INTERVAL"
+ ]
+ },
+ "ingress": [
+ {
+ "feedid": 1,
+ "subnet": "",
+ "user": "",
+ "node": [
+ "stub_from."
+ ]
+ }
+ ],
+ "egress": {
+ "1": "stub_to."
+ },
+ "routing": [
+ {
+ "from": 1,
+ "to": 3,
+ "via": 2
+ }
+ ]
+} \ No newline at end of file