aboutsummaryrefslogtreecommitdiffstats
path: root/datarouter-prov/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'datarouter-prov/src/main/java')
-rwxr-xr-xdatarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java345
-rw-r--r--datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Parameters.java1
-rw-r--r--datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/HttpServletUtils.java11
-rw-r--r--datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java999
-rw-r--r--datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/PasswordProcessor.java12
5 files changed, 655 insertions, 713 deletions
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java
index ef106ab4..3993b4df 100755
--- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java
+++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java
@@ -24,30 +24,33 @@
package org.onap.dmaap.datarouter.provisioning;
+import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN;
-
import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS;
import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME;
-import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID;
-
-
-import java.io.IOException;
-import java.io.InputStream;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
import java.sql.Connection;
import java.sql.SQLException;
-
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
import org.apache.commons.lang3.StringUtils;
+import org.jetbrains.annotations.Nullable;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@@ -55,21 +58,19 @@ import org.json.JSONTokener;
import org.onap.dmaap.datarouter.authz.Authorizer;
import org.onap.dmaap.datarouter.authz.impl.ProvAuthorizer;
import org.onap.dmaap.datarouter.authz.impl.ProvDataProvider;
-import org.onap.dmaap.datarouter.provisioning.beans.*;
+import org.onap.dmaap.datarouter.provisioning.beans.Deleteable;
+import org.onap.dmaap.datarouter.provisioning.beans.Feed;
+import org.onap.dmaap.datarouter.provisioning.beans.Group;
+import org.onap.dmaap.datarouter.provisioning.beans.Insertable;
+import org.onap.dmaap.datarouter.provisioning.beans.NodeClass;
+import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
+import org.onap.dmaap.datarouter.provisioning.beans.Subscription;
+import org.onap.dmaap.datarouter.provisioning.beans.Updateable;
import org.onap.dmaap.datarouter.provisioning.utils.DB;
import org.onap.dmaap.datarouter.provisioning.utils.PasswordProcessor;
import org.onap.dmaap.datarouter.provisioning.utils.ThrottleFilter;
import org.slf4j.MDC;
-import javax.mail.*;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeBodyPart;
-import javax.mail.internet.MimeMessage;
-import javax.mail.internet.MimeMultipart;
-import java.security.GeneralSecurityException;
-import java.util.*;
-import java.util.regex.Pattern;
-
/**
* This is the base class for all Servlets in the provisioning code. It provides standard constants and some common
@@ -94,10 +95,10 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
static final String CREATE_PERMISSION = "create";
static final String EDIT_PERMISSION = "edit";
static final String DELETE_PERMISSION = "delete";
- static final String PUBLISH_PERMISSION = "publish";
- static final String SUSPEND_PERMISSION = "suspend";
- static final String RESTORE_PERMISSION = "restore";
- static final String SUBSCRIBE_PERMISSION = "subscribe";
+ private static final String PUBLISH_PERMISSION = "publish";
+ private static final String SUSPEND_PERMISSION = "suspend";
+ private static final String RESTORE_PERMISSION = "restore";
+ private static final String SUBSCRIBE_PERMISSION = "subscribe";
static final String APPROVE_SUB_PERMISSION = "approveSub";
static final String FEED_BASECONTENT_TYPE = "application/vnd.dmaap-dr.feed";
@@ -113,7 +114,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
//Adding groups functionality, ...1610
static final String GROUP_BASECONTENT_TYPE = "application/vnd.dmaap-dr.group";
static final String GROUP_CONTENT_TYPE = "application/vnd.dmaap-dr.group; version=2.0";
- public static final String GROUPFULL_CONTENT_TYPE = "application/vnd.dmaap-dr.group-full; version=2.0";
+ static final String GROUPFULL_CONTENT_TYPE = "application/vnd.dmaap-dr.group-full; version=2.0";
public static final String GROUPLIST_CONTENT_TYPE = "application/vnd.dmaap-dr.fegrouped-list; version=1.0";
@@ -130,127 +131,123 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
private static final int DEFAULT_POKETIMER2 = 30;
private static final String DEFAULT_DOMAIN = "onap";
private static final String DEFAULT_PROVSRVR_NAME = "dmaap-dr-prov";
- private static final String STATIC_ROUTING_NODES = ""; //Adding new param for static Routing - Rally:US664862-1610
//Common Errors
- public static final String MISSING_ON_BEHALF = "Missing X-DMAAP-DR-ON-BEHALF-OF header.";
- public static final String MISSING_FEED = "Missing or bad feed number.";
- public static final String POLICY_ENGINE = "Policy Engine disallows access.";
- public static final String UNAUTHORIZED = "Unauthorized.";
- public static final String BAD_SUB = "Missing or bad subscription number.";
- public static final String BAD_JSON = "Badly formed JSON";
- public static final String BAD_URL = "Bad URL.";
+ static final String MISSING_ON_BEHALF = "Missing X-DMAAP-DR-ON-BEHALF-OF header.";
+ static final String MISSING_FEED = "Missing or bad feed number.";
+ static final String POLICY_ENGINE = "Policy Engine disallows access.";
+ static final String UNAUTHORIZED = "Unauthorized.";
+ static final String BAD_SUB = "Missing or bad subscription number.";
+ static final String BAD_JSON = "Badly formed JSON";
+ static final String BAD_URL = "Bad URL.";
public static final String API = "/api/";
- public static final String LOGS = "/logs/";
- public static final String TEXT_CT = "text/plain";
- public static final String INGRESS = "/ingress/";
- public static final String EGRESS = "/egress/";
- public static final String NETWORK = "/network/";
- public static final String GROUPID = "groupid";
+ static final String LOGS = "/logs/";
+ static final String TEXT_CT = "text/plain";
+ static final String INGRESS = "/ingress/";
+ static final String EGRESS = "/egress/";
+ static final String NETWORK = "/network/";
+ static final String GROUPID = "groupid";
public static final String FEEDID = "feedid";
- public static final String FEEDIDS = "feedids";
- public static final String SUBID = "subid";
- public static final String EVENT_TYPE = "eventType";
- public static final String OUTPUT_TYPE = "output_type";
- public static final String START_TIME = "start_time";
- public static final String END_TIME = "end_time";
- public static final String REASON_SQL = "reasonSQL";
+ static final String FEEDIDS = "feedids";
+ static final String SUBID = "subid";
+ static final String EVENT_TYPE = "eventType";
+ static final String OUTPUT_TYPE = "output_type";
+ static final String START_TIME = "start_time";
+ static final String END_TIME = "end_time";
+ static final String REASON_SQL = "reasonSQL";
/**
- * A boolean to trigger one time "provisioning changed" event on startup
+ * A boolean to trigger one time "provisioning changed" event on startup.
*/
private static boolean startmsgFlag = true;
/**
- * This POD should require SSL connections from clients; pulled from the DB (PROV_REQUIRE_SECURE)
+ * This POD should require SSL connections from clients; pulled from the DB (PROV_REQUIRE_SECURE).
*/
private static boolean requireSecure = true;
/**
- * This POD should require signed, recognized certificates from clients; pulled from the DB (PROV_REQUIRE_CERT)
+ * This POD should require signed, recognized certificates from clients; pulled from the DB (PROV_REQUIRE_CERT).
*/
private static boolean requireCert = true;
/**
- * The set of authorized addresses and networks; pulled from the DB (PROV_AUTH_ADDRESSES)
+ * The set of authorized addresses and networks; pulled from the DB (PROV_AUTH_ADDRESSES).
*/
private static Set<String> authorizedAddressesAndNetworks = new HashSet<>();
/**
- * The set of authorized names; pulled from the DB (PROV_AUTH_SUBJECTS)
+ * The set of authorized names; pulled from the DB (PROV_AUTH_SUBJECTS).
*/
private static Set<String> authorizedNames = new HashSet<>();
/**
- * The FQDN of the initially "active" provisioning server in this Data Router ecosystem
+ * The FQDN of the initially "active" provisioning server in this Data Router ecosystem.
*/
private static String initialActivePod;
/**
- * The FQDN of the initially "standby" provisioning server in this Data Router ecosystem
+ * The FQDN of the initially "standby" provisioning server in this Data Router ecosystem.
*/
private static String initialStandbyPod;
/**
- * The FQDN of this provisioning server in this Data Router ecosystem
+ * The FQDN of this provisioning server in this Data Router ecosystem.
*/
private static String thisPod;
/**
- * "Timer 1" - used to determine when to notify nodes of provisioning changes
+ * "Timer 1" - used to determine when to notify nodes of provisioning changes.
*/
private static long pokeTimer1;
/**
- * "Timer 2" - used to determine when to notify nodes of provisioning changes
+ * "Timer 2" - used to determine when to notify nodes of provisioning changes.
*/
private static long pokeTimer2;
/**
- * Array of nodes names and/or FQDNs
+ * Array of nodes names and/or FQDNs.
*/
private static String[] nodes = new String[0];
/**
- * [DATARTR-27] Poke all the DR nodes : Array of nodes names and/or FQDNs
+ * [DATARTR-27] Poke all the DR nodes : Array of nodes names and/or FQDNs.
*/
private static String[] drnodes = new String[0];
/**
- * Array of node IP addresses
+ * Array of node IP addresses.
*/
private static InetAddress[] nodeAddresses = new InetAddress[0];
/**
- * Array of POD IP addresses
+ * Array of POD IP addresses.
*/
private static InetAddress[] podAddresses = new InetAddress[0];
/**
- * The maximum number of feeds allowed; pulled from the DB (PROV_MAXFEED_COUNT)
+ * The maximum number of feeds allowed; pulled from the DB (PROV_MAXFEED_COUNT).
*/
static int maxFeeds = 0;
/**
- * The maximum number of subscriptions allowed; pulled from the DB (PROV_MAXSUB_COUNT)
+ * The maximum number of subscriptions allowed; pulled from the DB (PROV_MAXSUB_COUNT).
*/
static int maxSubs = 0;
/**
- * The current number of feeds in the system
+ * The current number of feeds in the system.
*/
static int activeFeeds = 0;
/**
- * The current number of subscriptions in the system
+ * The current number of subscriptions in the system.
*/
static int activeSubs = 0;
/**
- * The domain used to generate a FQDN from the "bare" node names
+ * The domain used to generate a FQDN from the "bare" node names.
*/
private static String provDomain = "web.att.com";
/**
- * The standard FQDN of the provisioning server in this Data Router ecosystem
+ * The standard FQDN of the provisioning server in this Data Router ecosystem.
*/
private static String provName = "feeds-drtr.web.att.com";
/**
- * The standard FQDN of the ACTIVE_POD provisioning server in this Data Router ecosystem
+ * The standard FQDN of the ACTIVE_POD provisioning server in this Data Router ecosystem.
*/
private static String activeProvName = "feeds-drtr.web.att.com";
- //Adding new param for static Routing - Rally:US664862-1610
- private static String staticRoutingNodes = STATIC_ROUTING_NODES;
-
/**
- * This logger is used to log provisioning events
+ * This logger is used to log provisioning events.
*/
protected static EELFLogger eventlogger;
/**
@@ -258,21 +255,17 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
*/
protected static EELFLogger intlogger;
/**
- * Authorizer - interface to the Policy Engine
+ * Authorizer - interface to the Policy Engine.
*/
protected static Authorizer authz;
/**
- * The Synchronizer used to sync active DB to standby one
+ * The Synchronizer used to sync active DB to standby one.
*/
private static SynchronizerTask synctask = null;
//Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
private InetAddress thishost;
private InetAddress loopback;
- private static Boolean mailSendFlag = false;
-
- private static final String MAILCONFIG_FILE = "mail.properties";
- private static Properties mailprops;
//DMAAP-597 (Tech Dept) REST request source IP auth relaxation to accommodate OOM kubernetes deploy
private static String isAddressAuthEnabled = (new DB()).getProperties()
@@ -285,10 +278,10 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
* Initialize data common to all the provisioning server servlets.
*/
protected BaseServlet() {
- if(eventlogger == null) {
- this.eventlogger = EELFManager.getInstance().getLogger("EventLog");
+ if (eventlogger == null) {
+ eventlogger = EELFManager.getInstance().getLogger("EventLog");
}
- if(intlogger == null) {
+ if (intlogger == null) {
this.intlogger = EELFManager.getInstance().getLogger("InternalLog");
}
if (authz == null) {
@@ -329,7 +322,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
}
/**
- * Read the request's input stream and return a JSONObject from it
+ * Read the request's input stream and return a JSONObject from it.
*
* @param req the HTTP request
* @return the JSONObject, or null if the stream cannot be parsed
@@ -348,35 +341,40 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
}
/**
- * This method encrypt/decrypt the key in the JSON passed by user request inside the authorisation header object in request before logging the JSON.
+ * This method encrypt/decrypt the key in the JSON passed by user request inside the authorisation
+ * header object in request before logging the JSON.
*
- * @param jo- the JSON passed in http request.
- * @param maskKey- the key to be masked in the JSON passed.
- * @param action- whether to mask the key or unmask it in a JSON passed.
+ * @param jo the JSON passed in http request.
+ * @param maskKey the key to be masked in the JSON passed.
+ * @param action whether to mask the key or unmask it in a JSON passed.
* @return the JSONObject, or null if the stream cannot be parsed.
*/
- public static JSONObject maskJSON(JSONObject jo, String maskKey, boolean action) {
+ static JSONObject maskJSON(JSONObject jo, String maskKey, boolean action) {
if (!jo.isNull("authorization")) {
- JSONObject j2 = jo.getJSONObject("authorization");
- JSONArray ja = j2.getJSONArray("endpoint_ids");
- for (int i = 0; i < ja.length(); i++) {
- if ((!ja.getJSONObject(i).isNull(maskKey))) {
- String password = ja.getJSONObject(i).get(maskKey).toString();
- try {
- if (action) {
- ja.getJSONObject(i).put(maskKey, PasswordProcessor.encrypt(password));
- } else {
- ja.getJSONObject(i).put(maskKey, PasswordProcessor.decrypt(password));
- }
- } catch (JSONException | GeneralSecurityException e) {
- intlogger.info("Error reading JSON while masking: " + e);
- }
+ JSONArray endpointIds = jo.getJSONObject("authorization").getJSONArray("endpoint_ids");
+ for (int index = 0; index < endpointIds.length(); index++) {
+ if ((!endpointIds.getJSONObject(index).isNull(maskKey))) {
+ String password = endpointIds.getJSONObject(index).get(maskKey).toString();
+ processPassword(maskKey, action, endpointIds, index, password);
}
}
}
return jo;
}
+ private static void processPassword(String maskKey, boolean action, JSONArray endpointIds, int index,
+ String password) {
+ try {
+ if (action) {
+ endpointIds.getJSONObject(index).put(maskKey, PasswordProcessor.encrypt(password));
+ } else {
+ endpointIds.getJSONObject(index).put(maskKey, PasswordProcessor.decrypt(password));
+ }
+ } catch (JSONException | GeneralSecurityException e) {
+ intlogger.info("Error reading JSON while masking: " + e);
+ }
+ }
+
/**
* Check if the remote host is authorized to perform provisioning. Is the request secure? Is it coming from an
* authorized IP address or network (configured via PROV_AUTH_ADDRESSES)? Does it have a valid client certificate
@@ -393,20 +391,9 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
if (requireSecure && !request.isSecure()) {
return "Request must be made over an HTTPS connection.";
}
- // Is remote IP authorized?
- String remote = request.getRemoteAddr();
- try {
- boolean found = false;
- InetAddress ip = InetAddress.getByName(remote);
- for (String addrnet : authorizedAddressesAndNetworks) {
- found |= addressMatchesNetwork(ip, addrnet);
- }
- if (!found) {
- return "Unauthorized address: " + remote;
- }
- } catch (UnknownHostException e) {
- intlogger.error("PROV0051 BaseServlet.isAuthorizedForProvisioning: " + e.getMessage(), e);
- return "Unauthorized address: " + remote;
+ String remoteHostCheck = checkRemoteHostAuthorization(request);
+ if (remoteHostCheck != null) {
+ return remoteHostCheck;
}
// Does remote have a valid certificate?
if (requireCert) {
@@ -425,6 +412,26 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
return null;
}
+ @Nullable
+ private String checkRemoteHostAuthorization(HttpServletRequest request) {
+ // Is remote IP authorized?
+ String remote = request.getRemoteAddr();
+ try {
+ boolean found = false;
+ InetAddress ip = InetAddress.getByName(remote);
+ for (String addrnet : authorizedAddressesAndNetworks) {
+ found |= addressMatchesNetwork(ip, addrnet);
+ }
+ if (!found) {
+ return "Unauthorized address: " + remote;
+ }
+ } catch (UnknownHostException e) {
+ intlogger.error("PROV0051 BaseServlet.isAuthorizedForProvisioning: " + e.getMessage(), e);
+ return "Unauthorized address: " + remote;
+ }
+ return null;
+ }
+
/**
* Check if the remote IP address is authorized to see the /internal URL tree.
*
@@ -438,19 +445,19 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
}
InetAddress ip = InetAddress.getByName(request.getRemoteAddr());
for (InetAddress node : getNodeAddresses()) {
- if (node != null && ip.equals(node)) {
+ if (ip.equals(node)) {
return true;
}
}
for (InetAddress pod : getPodAddresses()) {
- if (pod != null && ip.equals(pod)) {
+ if (ip.equals(pod)) {
return true;
}
}
- if (thishost != null && ip.equals(thishost)) {
+ if (ip.equals(thishost)) {
return true;
}
- if (loopback != null && ip.equals(loopback)) {
+ if (ip.equals(loopback)) {
return true;
}
} catch (UnknownHostException e) {
@@ -468,7 +475,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
*/
private static boolean addressMatchesNetwork(InetAddress ip, String s) {
int mlen = -1;
- int n = s.indexOf("/");
+ int n = s.indexOf('/');
if (n >= 0) {
mlen = Integer.parseInt(s.substring(n + 1));
s = s.substring(0, n);
@@ -528,16 +535,16 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
maxSubs = getInt(map, Parameters.PROV_MAXSUB_COUNT, DEFAULT_MAX_SUBS);
pokeTimer1 = getInt(map, Parameters.PROV_POKETIMER1, DEFAULT_POKETIMER1);
pokeTimer2 = getInt(map, Parameters.PROV_POKETIMER2, DEFAULT_POKETIMER2);
- /**
- * The domain used to generate a FQDN from the "bare" node names
- */
+
+ // The domain used to generate a FQDN from the "bare" node names
provDomain = getString(map, Parameters.PROV_DOMAIN, DEFAULT_DOMAIN);
provName = getString(map, Parameters.PROV_NAME, DEFAULT_PROVSRVR_NAME);
activeProvName = getString(map, Parameters.PROV_ACTIVE_NAME, provName);
initialActivePod = getString(map, Parameters.ACTIVE_POD, "");
initialStandbyPod = getString(map, Parameters.STANDBY_POD, "");
- staticRoutingNodes = getString(map, Parameters.STATIC_ROUTING_NODES,
- ""); //Adding new param for static Routing - Rally:US664862-1610
+
+ //Adding new param for static Routing - Rally:US664862-1610
+ String staticRoutingNodes = getString(map, Parameters.STATIC_ROUTING_NODES, "");
activeFeeds = Feed.countActiveFeeds();
activeSubs = Subscription.countActiveSubscriptions();
try {
@@ -597,78 +604,11 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
}
}
-
- /**
- * Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047. Load mail properties.
- *
- * @author vs215k
- **/
- private void loadMailProperties() {
- if (mailprops == null) {
- mailprops = new Properties();
- try (InputStream inStream = getClass().getClassLoader().getResourceAsStream(MAILCONFIG_FILE)) {
- mailprops.load(inStream);
- } catch (IOException e) {
- intlogger.error("PROV9003 Opening properties: " + e.getMessage(), e);
- System.exit(1);
- }
- }
- }
-
-
- /**
- * Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047.
- *
- * @param email - list of email ids to notify if HTTP relexcation is enabled.
- * @author vs215k
- **/
- private void notifyPSTeam(String email) {
- loadMailProperties(); //Load HTTPS Relex mail properties.
- String[] emails = email.split(Pattern.quote("|"));
-
- Properties mailproperties = new Properties();
- mailproperties.put("mail.smtp.host", mailprops.get("com.att.dmaap.datarouter.mail.server"));
- mailproperties.put("mail.transport.protocol", mailprops.get("com.att.dmaap.datarouter.mail.protocol"));
-
- Session session = Session.getDefaultInstance(mailproperties, null);
- Multipart mp = new MimeMultipart();
- MimeBodyPart htmlPart = new MimeBodyPart();
-
- try {
-
- Message msg = new MimeMessage(session);
- msg.setFrom(new InternetAddress(mailprops.get("com.att.dmaap.datarouter.mail.from").toString()));
-
- InternetAddress[] addressTo = new InternetAddress[emails.length];
- for (int x = 0; x < emails.length; x++) {
- addressTo[x] = new InternetAddress(emails[x]);
- }
-
- msg.addRecipients(Message.RecipientType.TO, addressTo);
- msg.setSubject(mailprops.get("com.att.dmaap.datarouter.mail.subject").toString());
- htmlPart.setContent(mailprops.get("com.att.dmaap.datarouter.mail.body").toString()
- .replace("[SERVER]", InetAddress.getLocalHost().getHostName()), "text/html");
- mp.addBodyPart(htmlPart);
- msg.setContent(mp);
-
- intlogger.info(mailprops.get("com.att.dmaap.datarouter.mail.body").toString()
- .replace("[SERVER]", InetAddress.getLocalHost().getHostName()));
-
- Transport.send(msg);
- intlogger.info("HTTPS relaxation mail is sent to - : " + email);
-
- } catch (MessagingException e) {
- intlogger.error("Invalid email address, unable to send https relaxation mail to - : " + email, e);
- } catch (UnknownHostException uhe) {
- intlogger.error("UnknownHostException", uhe);
- }
- }
-
public static String getProvName() {
return provName;
}
- public static String getActiveProvName() {
+ static String getActiveProvName() {
return activeProvName;
}
@@ -696,7 +636,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
*
* @return an array of InetAddresses
*/
- public static InetAddress[] getNodeAddresses() {
+ private static InetAddress[] getNodeAddresses() {
return nodeAddresses;
}
@@ -814,7 +754,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
private static boolean getBoolean(Map<String, String> map, String name) {
String s = map.get(name);
- return (s != null) && "true".equalsIgnoreCase(s);
+ return "true".equalsIgnoreCase(s);
}
private static String getString(Map<String, String> map, String name, String dflt) {
@@ -854,7 +794,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
*/
public class ContentHeader {
- private String type = "";
+ private String type;
private Map<String, String> map = new HashMap<>();
ContentHeader() {
@@ -870,7 +810,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
return type;
}
- public String getAttribute(String key) {
+ String getAttribute(String key) {
String s = map.get(key);
if (s == null) {
s = "";
@@ -976,19 +916,17 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
/*
* @Method - getGroupByFeedGroupId- Rally:US708115
* @Params - User to check in group and feedid which is assigned the group.
- * @return - string value grupid/null
+ * @return - string value groupid/null
*/
@Override
public String getGroupByFeedGroupId(String owner, String feedId) {
try {
- int n = Integer.parseInt(feedId);
- Feed f = Feed.getFeedById(n);
+ Feed f = Feed.getFeedById(Integer.parseInt(feedId));
if (f != null) {
int groupid = f.getGroupid();
if (groupid > 0) {
Group group = Group.getGroupById(groupid);
- assert group != null;
- if (isUserMemberOfGroup(group, owner)) {
+ if (group != null && isUserMemberOfGroup(group, owner)) {
return group.getAuthid();
}
}
@@ -1002,7 +940,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
/*
* @Method - getGroupBySubGroupId - Rally:US708115
* @Params - User to check in group and subid which is assigned the group.
- * @return - string value grupid/null
+ * @return - string value groupid/null
*/
@Override
public String getGroupBySubGroupId(String owner, String subId) {
@@ -1013,8 +951,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider {
int groupid = s.getGroupid();
if (groupid > 0) {
Group group = Group.getGroupById(groupid);
- assert group != null;
- if (isUserMemberOfGroup(group, owner)) {
+ if (group != null && isUserMemberOfGroup(group, owner)) {
return group.getAuthid();
}
}
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Parameters.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Parameters.java
index fff10ac7..357444e4 100644
--- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Parameters.java
+++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Parameters.java
@@ -57,6 +57,7 @@ public class Parameters extends Syncable {
public static final String PROV_POKETIMER2 = "PROV_POKETIMER2";
public static final String PROV_SPECIAL_SUBNET = "PROV_SPECIAL_SUBNET";
public static final String PROV_LOG_RETENTION = "PROV_LOG_RETENTION";
+ public static final String DEFAULT_LOG_RETENTION = "DEFAULT_LOG_RETENTION";
public static final String NODES = "NODES";
public static final String ACTIVE_POD = "ACTIVE_POD";
public static final String STANDBY_POD = "STANDBY_POD";
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/HttpServletUtils.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/HttpServletUtils.java
index d9f36de3..f59dc919 100644
--- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/HttpServletUtils.java
+++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/HttpServletUtils.java
@@ -20,14 +20,19 @@
* * ECOMP is a trademark and service mark of AT&T Intellectual Property.
* *
******************************************************************************/
-package org.onap.dmaap.datarouter.provisioning.utils;
-import javax.servlet.http.HttpServletResponse;
-import java.io.IOException;
+package org.onap.dmaap.datarouter.provisioning.utils;
import com.att.eelf.configuration.EELFLogger;
+import java.io.IOException;
+import javax.servlet.http.HttpServletResponse;
public class HttpServletUtils {
+
+ private HttpServletUtils(){
+
+ }
+
public static void sendResponseError(HttpServletResponse response, int errorCode, String message, EELFLogger intlogger) {
try {
response.sendError(errorCode, message);
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java
index 3ba1a151..c78a5b10 100644
--- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java
+++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java
@@ -1,501 +1,498 @@
-/*******************************************************************************
- * ============LICENSE_START==================================================
- * * org.onap.dmaap
- * * ===========================================================================
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * * ===========================================================================
- * * Licensed under the Apache License, Version 2.0 (the "License");
- * * you may not use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- * * ============LICENSE_END====================================================
- * *
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * *
- ******************************************************************************/
-
-
-package org.onap.dmaap.datarouter.provisioning.utils;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.LineNumberReader;
-import java.io.Reader;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.ParseException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.zip.GZIPInputStream;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import org.onap.dmaap.datarouter.provisioning.BaseServlet;
-import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;
-import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;
-import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;
-import org.onap.dmaap.datarouter.provisioning.beans.Loadable;
-import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;
-import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
-import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;
-import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;
-
-/**
- * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.
- * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be
- * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at
- * startup to load the old (1.0) style log tables into LOG_RECORDS;
- * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the
- * database.
- * This bit set is used to synchronize between provisioning servers.</p>
- *
- * @author Robert Eby
- * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $
- */
-public class LogfileLoader extends Thread {
- /**
- * Default number of log records to keep when pruning. Keep 10M by default.
- */
- public static final long DEFAULT_LOG_RETENTION = 10000000L;
- /**
- * NOT USED: Percentage of free space required before old records are removed.
- */
- public static final int REQUIRED_FREE_PCT = 20;
-
- /**
- * This is a singleton -- there is only one LogfileLoader object in the server
- */
- private static LogfileLoader logfileLoader;
-
- /**
- * Get the singleton LogfileLoader object, and start it if it is not running.
- *
- * @return the LogfileLoader
- */
- public static synchronized LogfileLoader getLoader() {
- if (logfileLoader == null)
- logfileLoader = new LogfileLoader();
- if (!logfileLoader.isAlive())
- logfileLoader.start();
- return logfileLoader;
- }
-
- /**
- * The PreparedStatement which is loaded by a <i>Loadable</i>.
- */
- public static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
- /**
- * Each server can assign this many IDs
- */
- private static final long SET_SIZE = (1L << 56);
-
- private final EELFLogger logger;
- private final DB db;
- private final String spooldir;
- private final long set_start;
- private final long set_end;
- private RLEBitSet seq_set;
- private long nextid;
- private boolean idle;
-
- private LogfileLoader() {
- this.logger = EELFManager.getInstance().getLogger("InternalLog");
- this.db = new DB();
- this.spooldir = db.getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
- this.set_start = getIdRange();
- this.set_end = set_start + SET_SIZE - 1;
- this.seq_set = new RLEBitSet();
- this.nextid = 0;
- this.idle = false;
-
- // This is a potentially lengthy operation, so has been moved to run()
- //initializeNextid();
- this.setDaemon(true);
- this.setName("LogfileLoader");
- }
-
- private long getIdRange() {
- long n;
- if (BaseServlet.isInitialActivePOD())
- n = 0;
- else if (BaseServlet.isInitialStandbyPOD())
- n = SET_SIZE;
- else
- n = SET_SIZE * 2;
- String r = String.format("[%X .. %X]", n, n + SET_SIZE - 1);
- logger.debug("This server shall assign RECORD_IDs in the range " + r);
- return n;
- }
-
- /**
- * Return the bit set representing the record ID's that are loaded in this database.
- *
- * @return the bit set
- */
- public RLEBitSet getBitSet() {
- return seq_set;
- }
-
- /**
- * True if the LogfileLoader is currently waiting for work.
- *
- * @return true if idle
- */
- public boolean isIdle() {
- return idle;
- }
-
- /**
- * Run continuously to look for new logfiles in the spool directory and import them into the DB.
- * The spool is checked once per second. If free space on the MariaDB filesystem falls below
- * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS
- * table is compacted until free space rises above the threshold.
- */
- @Override
- public void run() {
- initializeNextid(); // moved from the constructor
- while (true) {
- try {
- File dirfile = new File(spooldir);
- while (true) {
- // process IN files
- File[] infiles = dirfile.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith("IN.");
- }
- });
-
- if (infiles.length == 0) {
- idle = true;
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- idle = false;
- } else {
- // Remove old rows
- if (pruneRecords()) {
- // Removed at least some entries, recompute the bit map
- initializeNextid();
- }
-
- // Process incoming logfiles
- for (File f : infiles) {
- if (logger.isDebugEnabled())
- logger.debug("PROV8001 Starting " + f + " ...");
- long time = System.currentTimeMillis();
- int[] n = process(f);
- time = System.currentTimeMillis() - time;
- logger.info(String
- .format("PROV8000 Processed %s in %d ms; %d of %d records.",
- f.toString(), time, n[0], n[1]));
- f.delete();
- }
- }
- }
- } catch (Exception e) {
- logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);
- }
- }
- }
-
- boolean pruneRecords() {
- boolean did1 = false;
- long count = countRecords();
- long threshold = DEFAULT_LOG_RETENTION;
- Parameters param = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
- if (param != null) {
- try {
- long n = Long.parseLong(param.getValue());
- // This check is to prevent inadvertent errors from wiping the table out
- if (n > 1000000L)
- threshold = n;
- } catch (NumberFormatException e) {
- // ignore
- }
- }
- logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold);
- if (count > threshold) {
- count -= threshold; // we need to remove this many records;
- Map<Long, Long> hist = getHistogram(); // histogram of records per day
- // Determine the cutoff point to remove the needed number of records
- long sum = 0;
- long cutoff = 0;
- for (Long day : new TreeSet<Long>(hist.keySet())) {
- sum += hist.get(day);
- cutoff = day;
- if (sum >= count)
- break;
- }
- cutoff++;
- cutoff *= 86400000L; // convert day to ms
- logger.debug(" Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");
-
- Connection conn = null;
- try {
- // Limit to a million at a time to avoid typing up the DB for too long.
- conn = db.getConnection();
- try(PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {
- ps.setLong(1, cutoff);
- while (count > 0) {
- if (!ps.execute()) {
- int dcount = ps.getUpdateCount();
- count -= dcount;
- logger.debug(" " + dcount + " rows deleted.");
- did1 |= (dcount != 0);
- if (dcount == 0)
- count = 0; // prevent inf. loops
- } else {
- count = 0; // shouldn't happen!
- }
- }
- }
- try(Statement stmt = conn.createStatement()) {
- stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
- }
- } catch (SQLException e) {
- System.err.println(e);
- logger.error(e.toString());
- } finally {
- db.release(conn);
- }
- }
- return did1;
- }
-
- long countRecords() {
- long count = 0;
- Connection conn = null;
- try {
- conn = db.getConnection();
- try(Statement stmt = conn.createStatement()) {
- try(ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS")) {
- if (rs.next()) {
- count = rs.getLong("COUNT");
- }
- }
- }
- } catch (SQLException e) {
- System.err.println(e);
- logger.error(e.toString());
- } finally {
- db.release(conn);
- }
- return count;
- }
-
- Map<Long, Long> getHistogram() {
- Map<Long, Long> map = new HashMap<Long, Long>();
- Connection conn = null;
- try {
- logger.debug(" LOG_RECORD table histogram...");
- conn = db.getConnection();
- try(Statement stmt = conn.createStatement()) {
- try(ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY")) {
- while (rs.next()) {
- long day = rs.getLong("DAY");
- long cnt = rs.getLong("COUNT");
- map.put(day, cnt);
- logger.debug(" " + day + " " + cnt);
- }
- }
- }
- } catch (SQLException e) {
- System.err.println(e);
- logger.error(e.toString());
- } finally {
- db.release(conn);
- }
- return map;
- }
-
- private void initializeNextid() {
- Connection conn = null;
- try {
- conn = db.getConnection();
- RLEBitSet nbs = new RLEBitSet();
- try(Statement stmt = conn.createStatement()) {
- // Build a bitset of all records in the LOG_RECORDS table
- // We need to run this SELECT in stages, because otherwise we run out of memory!
- final long stepsize = 6000000L;
- boolean go_again = true;
- for (long i = 0; go_again; i += stepsize) {
- String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);
- try (ResultSet rs = stmt.executeQuery(sql)) {
- go_again = false;
- while (rs.next()) {
- long n = rs.getLong("RECORD_ID");
- nbs.set(n);
- go_again = true;
- }
- }
- }
- }
- seq_set = nbs;
- // Compare with the range for this server
- // Determine the next ID for this set of record IDs
- RLEBitSet tbs = (RLEBitSet) nbs.clone();
- RLEBitSet idset = new RLEBitSet();
- idset.set(set_start, set_start + SET_SIZE);
- tbs.and(idset);
- long t = tbs.length();
- nextid = (t == 0) ? set_start : (t - 1);
- if (nextid >= set_start + SET_SIZE) {
- // Handle wraparound, when the IDs reach the end of our "range"
- Long[] last = null;
- Iterator<Long[]> li = tbs.getRangeIterator();
- while (li.hasNext()) {
- last = li.next();
- }
- if (last != null) {
- tbs.clear(last[0], last[1] + 1);
- t = tbs.length();
- nextid = (t == 0) ? set_start : (t - 1);
- }
- }
- logger.debug(String.format("initializeNextid, next ID is %d (%x)", nextid, nextid));
- } catch (SQLException e) {
- System.err.println(e);
- logger.error(e.toString());
- } finally {
- db.release(conn);
- }
- }
-
- @SuppressWarnings("resource")
- int[] process(File f) {
- int ok = 0, total = 0;
- try {
- Connection conn = db.getConnection();
- PreparedStatement ps = conn.prepareStatement(INSERT_SQL);
- Reader r = f.getPath().endsWith(".gz")
- ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))
- : new FileReader(f);
- try(LineNumberReader in = new LineNumberReader(r)) {
- String line;
- while ((line = in.readLine()) != null) {
- try {
- for (Loadable rec : buildRecords(line)) {
- rec.load(ps);
- if (rec instanceof LogRecord) {
- LogRecord lr = ((LogRecord) rec);
- if (!seq_set.get(lr.getRecordId())) {
- ps.executeUpdate();
- seq_set.set(lr.getRecordId());
- } else
- logger.debug("Duplicate record ignored: " + lr.getRecordId());
- } else {
- if (++nextid > set_end)
- nextid = set_start;
- ps.setLong(18, nextid);
- ps.executeUpdate();
- seq_set.set(nextid);
- }
- ps.clearParameters();
- ok++;
- }
- } catch (SQLException e) {
- logger.warn("PROV8003 Invalid value in record: " + line);
- logger.debug(e.toString(), e);
- } catch (NumberFormatException e) {
- logger.warn("PROV8004 Invalid number in record: " + line);
- logger.debug(e.toString());
- } catch (ParseException e) {
- logger.warn("PROV8005 Invalid date in record: " + line);
- logger.debug(e.toString());
- } catch (Exception e) {
- logger.warn("PROV8006 Invalid pattern in record: " + line);
- logger.debug(e.toString(), e);
- }
- total++;
- }
- }
- ps.close();
- db.release(conn);
- conn = null;
- } catch (FileNotFoundException e) {
- logger.warn("PROV8007 Exception reading " + f + ": " + e);
- } catch (IOException e) {
- logger.warn("PROV8007 Exception reading " + f + ": " + e);
- } catch (SQLException e) {
- logger.warn("PROV8007 Exception reading " + f + ": " + e);
- }
- return new int[]{ok, total};
- }
-
- Loadable[] buildRecords(String line) throws ParseException {
- String[] pp = line.split("\\|");
- if (pp != null && pp.length >= 7) {
- String rtype = pp[1].toUpperCase();
- if (rtype.equals("PUB") && pp.length == 11) {
- // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status
- return new Loadable[]{new PublishRecord(pp)};
- }
- if (rtype.equals("DEL") && pp.length == 12) {
- // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid
- String[] subs = pp[4].split("\\s+");
- if (subs != null) {
- Loadable[] rv = new Loadable[subs.length];
- for (int i = 0; i < subs.length; i++) {
- // create a new record for each individual sub
- pp[4] = subs[i];
- rv[i] = new DeliveryRecord(pp);
- }
- return rv;
- }
- }
- if (rtype.equals("EXP") && pp.length == 11) {
- // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
- ExpiryRecord e = new ExpiryRecord(pp);
- if (e.getReason().equals("other"))
- logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: " + e.getPublishId());
- return new Loadable[]{e};
- }
- if (rtype.equals("PBF") && pp.length == 12) {
- // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
- return new Loadable[]{new PubFailRecord(pp)};
- }
- if (rtype.equals("DLX") && pp.length == 7) {
- // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent
- return new Loadable[]{new DeliveryExtraRecord(pp)};
- }
- if (rtype.equals("LOG") && (pp.length == 19 || pp.length == 20)) {
- // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
- return new Loadable[]{new LogRecord(pp)};
- }
- }
- logger.warn("PROV8002 bad record: " + line);
- return new Loadable[0];
- }
-
- /**
- * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
- *
- * @param a ignored
- * @throws InterruptedException
- */
- public static void main(String[] a) throws InterruptedException {
- LogfileLoader.getLoader();
- Thread.sleep(200000L);
- }
-}
+/*******************************************************************************
+ * ============LICENSE_START==================================================
+ * * org.onap.dmaap
+ * * ===========================================================================
+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * * ===========================================================================
+ * * Licensed under the Apache License, Version 2.0 (the "License");
+ * * you may not use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ * * ============LICENSE_END====================================================
+ * *
+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ * *
+ ******************************************************************************/
+
+
+package org.onap.dmaap.datarouter.provisioning.utils;
+
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.io.Reader;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.ParseException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.zip.GZIPInputStream;
+import org.onap.dmaap.datarouter.provisioning.BaseServlet;
+import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.Loadable;
+import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
+import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;
+import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;
+
+/**
+ * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.
+ * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be
+ * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at
+ * startup to load the old (1.0) style log tables into LOG_RECORDS;
+ * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the
+ * database.
+ * This bit set is used to synchronize between provisioning servers.</p>
+ *
+ * @author Robert Eby
+ * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $
+ */
+public class LogfileLoader extends Thread {
+ /**
+ * NOT USED: Percentage of free space required before old records are removed.
+ */
+ public static final int REQUIRED_FREE_PCT = 20;
+
+ /**
+ * This is a singleton -- there is only one LogfileLoader object in the server.
+ */
+ private static LogfileLoader logfileLoader;
+
+ /**
+ * The PreparedStatement which is loaded by a <i>Loadable</i>.
+ */
+ private static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ /**
+ * Each server can assign this many IDs.
+ */
+ private static final long SET_SIZE = (1L << 56);
+
+ private final EELFLogger logger;
+ private final DB db;
+ private final String spooldir;
+ private final long setStart;
+ private final long setEnd;
+ private RLEBitSet seqSet;
+ private long nextId;
+ private boolean idle;
+
+ /**
+ * Get the singleton LogfileLoader object, and start it if it is not running.
+ *
+ * @return the LogfileLoader
+ */
+ public static synchronized LogfileLoader getLoader() {
+ if (logfileLoader == null) {
+ logfileLoader = new LogfileLoader();
+ }
+ if (!logfileLoader.isAlive()) {
+ logfileLoader.start();
+ }
+ return logfileLoader;
+ }
+
+
+ private LogfileLoader() {
+ this.logger = EELFManager.getInstance().getLogger("InternalLog");
+ this.db = new DB();
+ this.spooldir = db.getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
+ this.setStart = getIdRange();
+ this.setEnd = setStart + SET_SIZE - 1;
+ this.seqSet = new RLEBitSet();
+ this.nextId = 0;
+ this.idle = false;
+ this.setDaemon(true);
+ this.setName("LogfileLoader");
+ }
+
+ private long getIdRange() {
+ long n;
+ if (BaseServlet.isInitialActivePOD()) {
+ n = 0;
+ } else if (BaseServlet.isInitialStandbyPOD()) {
+ n = SET_SIZE;
+ } else {
+ n = SET_SIZE * 2;
+ }
+ String r = String.format("[%X .. %X]", n, n + SET_SIZE - 1);
+ logger.debug("This server shall assign RECORD_IDs in the range " + r);
+ return n;
+ }
+
+ /**
+ * Return the bit set representing the record ID's that are loaded in this database.
+ *
+ * @return the bit set
+ */
+ public RLEBitSet getBitSet() {
+ return seqSet;
+ }
+
+ /**
+ * True if the LogfileLoader is currently waiting for work.
+ *
+ * @return true if idle
+ */
+ public boolean isIdle() {
+ return idle;
+ }
+
+ /**
+ * Run continuously to look for new logfiles in the spool directory and import them into the DB.
+ * The spool is checked once per second. If free space on the MariaDB filesystem falls below
+ * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS
+ * table is compacted until free space rises above the threshold.
+ */
+ @Override
+ public void run() {
+ initializeNextid();
+ while (true) {
+ try {
+ File dirfile = new File(spooldir);
+ while (true) {
+ runLogFileLoad(dirfile);
+ }
+ } catch (Exception e) {
+ logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);
+ }
+ }
+ }
+
+ private void runLogFileLoad(File filesDir) {
+ File[] inFiles = filesDir.listFiles((dir, name) -> name.startsWith("IN."));
+ if (inFiles != null) {
+ if (inFiles.length == 0) {
+ idle = true;
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ idle = false;
+ } else {
+ // Remove old rows
+ if (pruneRecords()) {
+ // Removed at least some entries, recompute the bit map
+ initializeNextid();
+ }
+ for (File file : inFiles) {
+ processFile(file);
+ }
+ }
+ }
+ }
+
+ private void processFile(File infile) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("PROV8001 Starting " + infile + " ...");
+ }
+ long time = System.currentTimeMillis();
+ int[] n = process(infile);
+ time = System.currentTimeMillis() - time;
+ logger.info(String.format("PROV8000 Processed %s in %d ms; %d of %d records.",
+ infile.toString(), time, n[0], n[1]));
+ try {
+ Files.delete(infile.toPath());
+ } catch (IOException e) {
+ logger.info("PROV8001 failed to delete file " + infile.getName(), e);
+ }
+ }
+
+ boolean pruneRecords() {
+ boolean did1 = false;
+ long count = countRecords();
+ Parameters defaultLogRetention = Parameters.getParameter(Parameters.DEFAULT_LOG_RETENTION);
+ long threshold = (defaultLogRetention != null) ? Long.parseLong(defaultLogRetention.getValue()) : 1000000L;
+ Parameters provLogRetention = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
+ if (provLogRetention != null) {
+ try {
+ long n = Long.parseLong(provLogRetention.getValue());
+ // This check is to prevent inadvertent errors from wiping the table out
+ if (n > 1000000L) {
+ threshold = n;
+ }
+ } catch (NumberFormatException e) {
+ // ignore
+ }
+ }
+ logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold);
+ if (count > threshold) {
+ // we need to remove this many records
+ count -= threshold;
+ // histogram of records per day
+ Map<Long, Long> hist = getHistogram();
+ // Determine the cutoff point to remove the needed number of records
+ long sum = 0;
+ long cutoff = 0;
+ for (Long day : new TreeSet<>(hist.keySet())) {
+ sum += hist.get(day);
+ cutoff = day;
+ if (sum >= count) {
+ break;
+ }
+ }
+ cutoff++;
+ // convert day to ms
+ cutoff *= 86400000L;
+ logger.debug(" Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");
+
+ Connection conn = null;
+ try {
+ // Limit to a million at a time to avoid typing up the DB for too long.
+ conn = db.getConnection();
+ try (PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {
+ ps.setLong(1, cutoff);
+ while (count > 0) {
+ if (!ps.execute()) {
+ int dcount = ps.getUpdateCount();
+ count -= dcount;
+ logger.debug(" " + dcount + " rows deleted.");
+ did1 |= (dcount != 0);
+ if (dcount == 0) {
+ count = 0; // prevent inf. loops
+ }
+ } else {
+ count = 0; // shouldn't happen!
+ }
+ }
+ }
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
+ }
+ } catch (SQLException e) {
+ logger.error(e.toString());
+ } finally {
+ db.release(conn);
+ }
+ }
+ return did1;
+ }
+
+ long countRecords() {
+ long count = 0;
+ Connection conn = null;
+ try {
+ conn = db.getConnection();
+ try (Statement stmt = conn.createStatement()) {
+ try (ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS")) {
+ if (rs.next()) {
+ count = rs.getLong("COUNT");
+ }
+ }
+ }
+ } catch (SQLException e) {
+ logger.error(e.toString());
+ } finally {
+ db.release(conn);
+ }
+ return count;
+ }
+
+ Map<Long, Long> getHistogram() {
+ Map<Long, Long> map = new HashMap<>();
+ Connection conn = null;
+ try {
+ logger.debug(" LOG_RECORD table histogram...");
+ conn = db.getConnection();
+ try (Statement stmt = conn.createStatement()) {
+ try (ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY")) {
+ while (rs.next()) {
+ long day = rs.getLong("DAY");
+ long cnt = rs.getLong("COUNT");
+ map.put(day, cnt);
+ logger.debug(" " + day + " " + cnt);
+ }
+ }
+ }
+ } catch (SQLException e) {
+ logger.error(e.toString());
+ } finally {
+ db.release(conn);
+ }
+ return map;
+ }
+
+ private void initializeNextid() {
+ Connection conn = null;
+ try {
+ conn = db.getConnection();
+ RLEBitSet nbs = new RLEBitSet();
+ try (Statement stmt = conn.createStatement()) {
+ // Build a bitset of all records in the LOG_RECORDS table
+ // We need to run this SELECT in stages, because otherwise we run out of memory!
+ final long stepsize = 6000000L;
+ boolean goAgain = true;
+ for (long i = 0; goAgain; i += stepsize) {
+ String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);
+ try (ResultSet rs = stmt.executeQuery(sql)) {
+ goAgain = false;
+ while (rs.next()) {
+ long n = rs.getLong("RECORD_ID");
+ nbs.set(n);
+ goAgain = true;
+ }
+ }
+ }
+ }
+ seqSet = nbs;
+ // Compare with the range for this server
+ // Determine the next ID for this set of record IDs
+ RLEBitSet tbs = (RLEBitSet) nbs.clone();
+ RLEBitSet idset = new RLEBitSet();
+ idset.set(setStart, setStart + SET_SIZE);
+ tbs.and(idset);
+ long t = tbs.length();
+ nextId = (t == 0) ? setStart : (t - 1);
+ if (nextId >= setStart + SET_SIZE) {
+ // Handle wraparound, when the IDs reach the end of our "range"
+ Long[] last = null;
+ Iterator<Long[]> li = tbs.getRangeIterator();
+ while (li.hasNext()) {
+ last = li.next();
+ }
+ if (last != null) {
+ tbs.clear(last[0], last[1] + 1);
+ t = tbs.length();
+ nextId = (t == 0) ? setStart : (t - 1);
+ }
+ }
+ logger.debug(String.format("initializeNextid, next ID is %d (%x)", nextId, nextId));
+ } catch (SQLException e) {
+ logger.error(e.toString());
+ } finally {
+ db.release(conn);
+ }
+ }
+
+ @SuppressWarnings("resource")
+ int[] process(File f) {
+ int ok = 0;
+ int total = 0;
+ try {
+ Connection conn = db.getConnection();
+ PreparedStatement ps = conn.prepareStatement(INSERT_SQL);
+ Reader r = f.getPath().endsWith(".gz")
+ ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))
+ : new FileReader(f);
+ try (LineNumberReader in = new LineNumberReader(r)) {
+ String line;
+ while ((line = in.readLine()) != null) {
+ try {
+ for (Loadable rec : buildRecords(line)) {
+ rec.load(ps);
+ if (rec instanceof LogRecord) {
+ LogRecord lr = ((LogRecord) rec);
+ if (!seqSet.get(lr.getRecordId())) {
+ ps.executeUpdate();
+ seqSet.set(lr.getRecordId());
+ } else {
+ logger.debug("Duplicate record ignored: " + lr.getRecordId());
+ }
+ } else {
+ if (++nextId > setEnd) {
+ nextId = setStart;
+ }
+ ps.setLong(18, nextId);
+ ps.executeUpdate();
+ seqSet.set(nextId);
+ }
+ ps.clearParameters();
+ ok++;
+ }
+ } catch (SQLException e) {
+ logger.warn("PROV8003 Invalid value in record: " + line, e);
+ } catch (NumberFormatException e) {
+ logger.warn("PROV8004 Invalid number in record: " + line, e);
+ } catch (ParseException e) {
+ logger.warn("PROV8005 Invalid date in record: " + line, e);
+ } catch (Exception e) {
+ logger.warn("PROV8006 Invalid pattern in record: " + line, e);
+ }
+ total++;
+ }
+ }
+ ps.close();
+ db.release(conn);
+ } catch (SQLException | IOException e) {
+ logger.warn("PROV8007 Exception reading " + f + ": " + e);
+ }
+ return new int[]{ok, total};
+ }
+
+ Loadable[] buildRecords(String line) throws ParseException {
+ String[] pp = line.split("\\|");
+ if (pp != null && pp.length >= 7) {
+ String rtype = pp[1].toUpperCase();
+ if ("PUB".equals(rtype) && pp.length == 11) {
+ // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status
+ return new Loadable[]{new PublishRecord(pp)};
+ }
+ if ("DEL".equals(rtype) && pp.length == 12) {
+ // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid
+ String[] subs = pp[4].split("\\s+");
+ if (subs != null) {
+ Loadable[] rv = new Loadable[subs.length];
+ for (int i = 0; i < subs.length; i++) {
+ // create a new record for each individual sub
+ pp[4] = subs[i];
+ rv[i] = new DeliveryRecord(pp);
+ }
+ return rv;
+ }
+ }
+ if ("EXP".equals(rtype) && pp.length == 11) {
+ // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
+ ExpiryRecord e = new ExpiryRecord(pp);
+ if ("other".equals(e.getReason())) {
+ logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: " + e.getPublishId());
+ }
+ return new Loadable[]{e};
+ }
+ if ("PBF".equals(rtype) && pp.length == 12) {
+ // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
+ return new Loadable[]{new PubFailRecord(pp)};
+ }
+ if ("DLX".equals(rtype) && pp.length == 7) {
+ // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent
+ return new Loadable[]{new DeliveryExtraRecord(pp)};
+ }
+ if ("LOG".equals(rtype) && (pp.length == 19 || pp.length == 20)) {
+ // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
+ return new Loadable[]{new LogRecord(pp)};
+ }
+ }
+ logger.warn("PROV8002 bad record: " + line);
+ return new Loadable[0];
+ }
+
+ /**
+ * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
+ *
+ * @param a ignored
+ */
+ public static void main(String[] a) throws InterruptedException {
+ LogfileLoader.getLoader();
+ Thread.sleep(200000L);
+ }
+}
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/PasswordProcessor.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/PasswordProcessor.java
index 44142031..cb6881fb 100644
--- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/PasswordProcessor.java
+++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/PasswordProcessor.java
@@ -21,14 +21,15 @@
package org.onap.dmaap.datarouter.provisioning.utils;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.Base64;
+
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.spec.PBEParameterSpec;
-import java.nio.charset.StandardCharsets;
-import java.security.GeneralSecurityException;
-import java.util.Base64;
/**
* The Processing of a Password. Password can be encrypted and decrypted.
@@ -37,13 +38,14 @@ import java.util.Base64;
*/
public class PasswordProcessor {
- private PasswordProcessor(){}
-
private static final String SECRET_KEY_FACTORY_TYPE = "PBEWithMD5AndDES";
private static final String PASSWORD_ENCRYPTION_STRING = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.passwordencryption");
private static final char[] PASSWORD = PASSWORD_ENCRYPTION_STRING.toCharArray();
private static final byte[] SALT = {(byte) 0xde, (byte) 0x33, (byte) 0x10, (byte) 0x12, (byte) 0xde, (byte) 0x33, (byte) 0x10, (byte) 0x12,};
+ private PasswordProcessor(){
+ }
+
/**
* Encrypt password.
* @param property the Password