diff options
author | efiacor <fiachra.corcoran@est.tech> | 2019-07-16 09:49:13 +0000 |
---|---|---|
committer | efiacor <fiachra.corcoran@est.tech> | 2019-07-16 09:49:13 +0000 |
commit | c87a3bf443d1d71389da4cda76adbddcac26e7a2 (patch) | |
tree | 6b7614985bb633a7cde4ffcdb2ca01ea5ee66bbe /datarouter-prov/src/main/java/org | |
parent | 98572b78fcce9ff28fa7429c9265812bd1e78bf2 (diff) |
Even more unit test and code cleanup
Change-Id: Ide9477f5f8856e4ab35864bdc93d27a2d59afc83
Issue-ID: DMAAP-1226
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Diffstat (limited to 'datarouter-prov/src/main/java/org')
5 files changed, 655 insertions, 713 deletions
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java index ef106ab4..3993b4df 100755 --- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/BaseServlet.java @@ -24,30 +24,33 @@ package org.onap.dmaap.datarouter.provisioning; +import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID; import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN; - import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS; import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME; -import static com.att.eelf.configuration.Configuration.MDC_KEY_REQUEST_ID; - - -import java.io.IOException; -import java.io.InputStream; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import java.net.InetAddress; import java.net.UnknownHostException; +import java.security.GeneralSecurityException; import java.security.cert.X509Certificate; import java.sql.Connection; import java.sql.SQLException; - +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; - -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; import org.apache.commons.lang3.StringUtils; +import org.jetbrains.annotations.Nullable; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -55,21 +58,19 @@ import org.json.JSONTokener; import org.onap.dmaap.datarouter.authz.Authorizer; import org.onap.dmaap.datarouter.authz.impl.ProvAuthorizer; import org.onap.dmaap.datarouter.authz.impl.ProvDataProvider; -import org.onap.dmaap.datarouter.provisioning.beans.*; +import org.onap.dmaap.datarouter.provisioning.beans.Deleteable; +import org.onap.dmaap.datarouter.provisioning.beans.Feed; +import org.onap.dmaap.datarouter.provisioning.beans.Group; +import org.onap.dmaap.datarouter.provisioning.beans.Insertable; +import org.onap.dmaap.datarouter.provisioning.beans.NodeClass; +import org.onap.dmaap.datarouter.provisioning.beans.Parameters; +import org.onap.dmaap.datarouter.provisioning.beans.Subscription; +import org.onap.dmaap.datarouter.provisioning.beans.Updateable; import org.onap.dmaap.datarouter.provisioning.utils.DB; import org.onap.dmaap.datarouter.provisioning.utils.PasswordProcessor; import org.onap.dmaap.datarouter.provisioning.utils.ThrottleFilter; import org.slf4j.MDC; -import javax.mail.*; -import javax.mail.internet.InternetAddress; -import javax.mail.internet.MimeBodyPart; -import javax.mail.internet.MimeMessage; -import javax.mail.internet.MimeMultipart; -import java.security.GeneralSecurityException; -import java.util.*; -import java.util.regex.Pattern; - /** * This is the base class for all Servlets in the provisioning code. It provides standard constants and some common @@ -94,10 +95,10 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { static final String CREATE_PERMISSION = "create"; static final String EDIT_PERMISSION = "edit"; static final String DELETE_PERMISSION = "delete"; - static final String PUBLISH_PERMISSION = "publish"; - static final String SUSPEND_PERMISSION = "suspend"; - static final String RESTORE_PERMISSION = "restore"; - static final String SUBSCRIBE_PERMISSION = "subscribe"; + private static final String PUBLISH_PERMISSION = "publish"; + private static final String SUSPEND_PERMISSION = "suspend"; + private static final String RESTORE_PERMISSION = "restore"; + private static final String SUBSCRIBE_PERMISSION = "subscribe"; static final String APPROVE_SUB_PERMISSION = "approveSub"; static final String FEED_BASECONTENT_TYPE = "application/vnd.dmaap-dr.feed"; @@ -113,7 +114,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { //Adding groups functionality, ...1610 static final String GROUP_BASECONTENT_TYPE = "application/vnd.dmaap-dr.group"; static final String GROUP_CONTENT_TYPE = "application/vnd.dmaap-dr.group; version=2.0"; - public static final String GROUPFULL_CONTENT_TYPE = "application/vnd.dmaap-dr.group-full; version=2.0"; + static final String GROUPFULL_CONTENT_TYPE = "application/vnd.dmaap-dr.group-full; version=2.0"; public static final String GROUPLIST_CONTENT_TYPE = "application/vnd.dmaap-dr.fegrouped-list; version=1.0"; @@ -130,127 +131,123 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { private static final int DEFAULT_POKETIMER2 = 30; private static final String DEFAULT_DOMAIN = "onap"; private static final String DEFAULT_PROVSRVR_NAME = "dmaap-dr-prov"; - private static final String STATIC_ROUTING_NODES = ""; //Adding new param for static Routing - Rally:US664862-1610 //Common Errors - public static final String MISSING_ON_BEHALF = "Missing X-DMAAP-DR-ON-BEHALF-OF header."; - public static final String MISSING_FEED = "Missing or bad feed number."; - public static final String POLICY_ENGINE = "Policy Engine disallows access."; - public static final String UNAUTHORIZED = "Unauthorized."; - public static final String BAD_SUB = "Missing or bad subscription number."; - public static final String BAD_JSON = "Badly formed JSON"; - public static final String BAD_URL = "Bad URL."; + static final String MISSING_ON_BEHALF = "Missing X-DMAAP-DR-ON-BEHALF-OF header."; + static final String MISSING_FEED = "Missing or bad feed number."; + static final String POLICY_ENGINE = "Policy Engine disallows access."; + static final String UNAUTHORIZED = "Unauthorized."; + static final String BAD_SUB = "Missing or bad subscription number."; + static final String BAD_JSON = "Badly formed JSON"; + static final String BAD_URL = "Bad URL."; public static final String API = "/api/"; - public static final String LOGS = "/logs/"; - public static final String TEXT_CT = "text/plain"; - public static final String INGRESS = "/ingress/"; - public static final String EGRESS = "/egress/"; - public static final String NETWORK = "/network/"; - public static final String GROUPID = "groupid"; + static final String LOGS = "/logs/"; + static final String TEXT_CT = "text/plain"; + static final String INGRESS = "/ingress/"; + static final String EGRESS = "/egress/"; + static final String NETWORK = "/network/"; + static final String GROUPID = "groupid"; public static final String FEEDID = "feedid"; - public static final String FEEDIDS = "feedids"; - public static final String SUBID = "subid"; - public static final String EVENT_TYPE = "eventType"; - public static final String OUTPUT_TYPE = "output_type"; - public static final String START_TIME = "start_time"; - public static final String END_TIME = "end_time"; - public static final String REASON_SQL = "reasonSQL"; + static final String FEEDIDS = "feedids"; + static final String SUBID = "subid"; + static final String EVENT_TYPE = "eventType"; + static final String OUTPUT_TYPE = "output_type"; + static final String START_TIME = "start_time"; + static final String END_TIME = "end_time"; + static final String REASON_SQL = "reasonSQL"; /** - * A boolean to trigger one time "provisioning changed" event on startup + * A boolean to trigger one time "provisioning changed" event on startup. */ private static boolean startmsgFlag = true; /** - * This POD should require SSL connections from clients; pulled from the DB (PROV_REQUIRE_SECURE) + * This POD should require SSL connections from clients; pulled from the DB (PROV_REQUIRE_SECURE). */ private static boolean requireSecure = true; /** - * This POD should require signed, recognized certificates from clients; pulled from the DB (PROV_REQUIRE_CERT) + * This POD should require signed, recognized certificates from clients; pulled from the DB (PROV_REQUIRE_CERT). */ private static boolean requireCert = true; /** - * The set of authorized addresses and networks; pulled from the DB (PROV_AUTH_ADDRESSES) + * The set of authorized addresses and networks; pulled from the DB (PROV_AUTH_ADDRESSES). */ private static Set<String> authorizedAddressesAndNetworks = new HashSet<>(); /** - * The set of authorized names; pulled from the DB (PROV_AUTH_SUBJECTS) + * The set of authorized names; pulled from the DB (PROV_AUTH_SUBJECTS). */ private static Set<String> authorizedNames = new HashSet<>(); /** - * The FQDN of the initially "active" provisioning server in this Data Router ecosystem + * The FQDN of the initially "active" provisioning server in this Data Router ecosystem. */ private static String initialActivePod; /** - * The FQDN of the initially "standby" provisioning server in this Data Router ecosystem + * The FQDN of the initially "standby" provisioning server in this Data Router ecosystem. */ private static String initialStandbyPod; /** - * The FQDN of this provisioning server in this Data Router ecosystem + * The FQDN of this provisioning server in this Data Router ecosystem. */ private static String thisPod; /** - * "Timer 1" - used to determine when to notify nodes of provisioning changes + * "Timer 1" - used to determine when to notify nodes of provisioning changes. */ private static long pokeTimer1; /** - * "Timer 2" - used to determine when to notify nodes of provisioning changes + * "Timer 2" - used to determine when to notify nodes of provisioning changes. */ private static long pokeTimer2; /** - * Array of nodes names and/or FQDNs + * Array of nodes names and/or FQDNs. */ private static String[] nodes = new String[0]; /** - * [DATARTR-27] Poke all the DR nodes : Array of nodes names and/or FQDNs + * [DATARTR-27] Poke all the DR nodes : Array of nodes names and/or FQDNs. */ private static String[] drnodes = new String[0]; /** - * Array of node IP addresses + * Array of node IP addresses. */ private static InetAddress[] nodeAddresses = new InetAddress[0]; /** - * Array of POD IP addresses + * Array of POD IP addresses. */ private static InetAddress[] podAddresses = new InetAddress[0]; /** - * The maximum number of feeds allowed; pulled from the DB (PROV_MAXFEED_COUNT) + * The maximum number of feeds allowed; pulled from the DB (PROV_MAXFEED_COUNT). */ static int maxFeeds = 0; /** - * The maximum number of subscriptions allowed; pulled from the DB (PROV_MAXSUB_COUNT) + * The maximum number of subscriptions allowed; pulled from the DB (PROV_MAXSUB_COUNT). */ static int maxSubs = 0; /** - * The current number of feeds in the system + * The current number of feeds in the system. */ static int activeFeeds = 0; /** - * The current number of subscriptions in the system + * The current number of subscriptions in the system. */ static int activeSubs = 0; /** - * The domain used to generate a FQDN from the "bare" node names + * The domain used to generate a FQDN from the "bare" node names. */ private static String provDomain = "web.att.com"; /** - * The standard FQDN of the provisioning server in this Data Router ecosystem + * The standard FQDN of the provisioning server in this Data Router ecosystem. */ private static String provName = "feeds-drtr.web.att.com"; /** - * The standard FQDN of the ACTIVE_POD provisioning server in this Data Router ecosystem + * The standard FQDN of the ACTIVE_POD provisioning server in this Data Router ecosystem. */ private static String activeProvName = "feeds-drtr.web.att.com"; - //Adding new param for static Routing - Rally:US664862-1610 - private static String staticRoutingNodes = STATIC_ROUTING_NODES; - /** - * This logger is used to log provisioning events + * This logger is used to log provisioning events. */ protected static EELFLogger eventlogger; /** @@ -258,21 +255,17 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { */ protected static EELFLogger intlogger; /** - * Authorizer - interface to the Policy Engine + * Authorizer - interface to the Policy Engine. */ protected static Authorizer authz; /** - * The Synchronizer used to sync active DB to standby one + * The Synchronizer used to sync active DB to standby one. */ private static SynchronizerTask synctask = null; //Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047. private InetAddress thishost; private InetAddress loopback; - private static Boolean mailSendFlag = false; - - private static final String MAILCONFIG_FILE = "mail.properties"; - private static Properties mailprops; //DMAAP-597 (Tech Dept) REST request source IP auth relaxation to accommodate OOM kubernetes deploy private static String isAddressAuthEnabled = (new DB()).getProperties() @@ -285,10 +278,10 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { * Initialize data common to all the provisioning server servlets. */ protected BaseServlet() { - if(eventlogger == null) { - this.eventlogger = EELFManager.getInstance().getLogger("EventLog"); + if (eventlogger == null) { + eventlogger = EELFManager.getInstance().getLogger("EventLog"); } - if(intlogger == null) { + if (intlogger == null) { this.intlogger = EELFManager.getInstance().getLogger("InternalLog"); } if (authz == null) { @@ -329,7 +322,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { } /** - * Read the request's input stream and return a JSONObject from it + * Read the request's input stream and return a JSONObject from it. * * @param req the HTTP request * @return the JSONObject, or null if the stream cannot be parsed @@ -348,35 +341,40 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { } /** - * This method encrypt/decrypt the key in the JSON passed by user request inside the authorisation header object in request before logging the JSON. + * This method encrypt/decrypt the key in the JSON passed by user request inside the authorisation + * header object in request before logging the JSON. * - * @param jo- the JSON passed in http request. - * @param maskKey- the key to be masked in the JSON passed. - * @param action- whether to mask the key or unmask it in a JSON passed. + * @param jo the JSON passed in http request. + * @param maskKey the key to be masked in the JSON passed. + * @param action whether to mask the key or unmask it in a JSON passed. * @return the JSONObject, or null if the stream cannot be parsed. */ - public static JSONObject maskJSON(JSONObject jo, String maskKey, boolean action) { + static JSONObject maskJSON(JSONObject jo, String maskKey, boolean action) { if (!jo.isNull("authorization")) { - JSONObject j2 = jo.getJSONObject("authorization"); - JSONArray ja = j2.getJSONArray("endpoint_ids"); - for (int i = 0; i < ja.length(); i++) { - if ((!ja.getJSONObject(i).isNull(maskKey))) { - String password = ja.getJSONObject(i).get(maskKey).toString(); - try { - if (action) { - ja.getJSONObject(i).put(maskKey, PasswordProcessor.encrypt(password)); - } else { - ja.getJSONObject(i).put(maskKey, PasswordProcessor.decrypt(password)); - } - } catch (JSONException | GeneralSecurityException e) { - intlogger.info("Error reading JSON while masking: " + e); - } + JSONArray endpointIds = jo.getJSONObject("authorization").getJSONArray("endpoint_ids"); + for (int index = 0; index < endpointIds.length(); index++) { + if ((!endpointIds.getJSONObject(index).isNull(maskKey))) { + String password = endpointIds.getJSONObject(index).get(maskKey).toString(); + processPassword(maskKey, action, endpointIds, index, password); } } } return jo; } + private static void processPassword(String maskKey, boolean action, JSONArray endpointIds, int index, + String password) { + try { + if (action) { + endpointIds.getJSONObject(index).put(maskKey, PasswordProcessor.encrypt(password)); + } else { + endpointIds.getJSONObject(index).put(maskKey, PasswordProcessor.decrypt(password)); + } + } catch (JSONException | GeneralSecurityException e) { + intlogger.info("Error reading JSON while masking: " + e); + } + } + /** * Check if the remote host is authorized to perform provisioning. Is the request secure? Is it coming from an * authorized IP address or network (configured via PROV_AUTH_ADDRESSES)? Does it have a valid client certificate @@ -393,20 +391,9 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { if (requireSecure && !request.isSecure()) { return "Request must be made over an HTTPS connection."; } - // Is remote IP authorized? - String remote = request.getRemoteAddr(); - try { - boolean found = false; - InetAddress ip = InetAddress.getByName(remote); - for (String addrnet : authorizedAddressesAndNetworks) { - found |= addressMatchesNetwork(ip, addrnet); - } - if (!found) { - return "Unauthorized address: " + remote; - } - } catch (UnknownHostException e) { - intlogger.error("PROV0051 BaseServlet.isAuthorizedForProvisioning: " + e.getMessage(), e); - return "Unauthorized address: " + remote; + String remoteHostCheck = checkRemoteHostAuthorization(request); + if (remoteHostCheck != null) { + return remoteHostCheck; } // Does remote have a valid certificate? if (requireCert) { @@ -425,6 +412,26 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { return null; } + @Nullable + private String checkRemoteHostAuthorization(HttpServletRequest request) { + // Is remote IP authorized? + String remote = request.getRemoteAddr(); + try { + boolean found = false; + InetAddress ip = InetAddress.getByName(remote); + for (String addrnet : authorizedAddressesAndNetworks) { + found |= addressMatchesNetwork(ip, addrnet); + } + if (!found) { + return "Unauthorized address: " + remote; + } + } catch (UnknownHostException e) { + intlogger.error("PROV0051 BaseServlet.isAuthorizedForProvisioning: " + e.getMessage(), e); + return "Unauthorized address: " + remote; + } + return null; + } + /** * Check if the remote IP address is authorized to see the /internal URL tree. * @@ -438,19 +445,19 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { } InetAddress ip = InetAddress.getByName(request.getRemoteAddr()); for (InetAddress node : getNodeAddresses()) { - if (node != null && ip.equals(node)) { + if (ip.equals(node)) { return true; } } for (InetAddress pod : getPodAddresses()) { - if (pod != null && ip.equals(pod)) { + if (ip.equals(pod)) { return true; } } - if (thishost != null && ip.equals(thishost)) { + if (ip.equals(thishost)) { return true; } - if (loopback != null && ip.equals(loopback)) { + if (ip.equals(loopback)) { return true; } } catch (UnknownHostException e) { @@ -468,7 +475,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { */ private static boolean addressMatchesNetwork(InetAddress ip, String s) { int mlen = -1; - int n = s.indexOf("/"); + int n = s.indexOf('/'); if (n >= 0) { mlen = Integer.parseInt(s.substring(n + 1)); s = s.substring(0, n); @@ -528,16 +535,16 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { maxSubs = getInt(map, Parameters.PROV_MAXSUB_COUNT, DEFAULT_MAX_SUBS); pokeTimer1 = getInt(map, Parameters.PROV_POKETIMER1, DEFAULT_POKETIMER1); pokeTimer2 = getInt(map, Parameters.PROV_POKETIMER2, DEFAULT_POKETIMER2); - /** - * The domain used to generate a FQDN from the "bare" node names - */ + + // The domain used to generate a FQDN from the "bare" node names provDomain = getString(map, Parameters.PROV_DOMAIN, DEFAULT_DOMAIN); provName = getString(map, Parameters.PROV_NAME, DEFAULT_PROVSRVR_NAME); activeProvName = getString(map, Parameters.PROV_ACTIVE_NAME, provName); initialActivePod = getString(map, Parameters.ACTIVE_POD, ""); initialStandbyPod = getString(map, Parameters.STANDBY_POD, ""); - staticRoutingNodes = getString(map, Parameters.STATIC_ROUTING_NODES, - ""); //Adding new param for static Routing - Rally:US664862-1610 + + //Adding new param for static Routing - Rally:US664862-1610 + String staticRoutingNodes = getString(map, Parameters.STATIC_ROUTING_NODES, ""); activeFeeds = Feed.countActiveFeeds(); activeSubs = Subscription.countActiveSubscriptions(); try { @@ -597,78 +604,11 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { } } - - /** - * Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047. Load mail properties. - * - * @author vs215k - **/ - private void loadMailProperties() { - if (mailprops == null) { - mailprops = new Properties(); - try (InputStream inStream = getClass().getClassLoader().getResourceAsStream(MAILCONFIG_FILE)) { - mailprops.load(inStream); - } catch (IOException e) { - intlogger.error("PROV9003 Opening properties: " + e.getMessage(), e); - System.exit(1); - } - } - } - - - /** - * Data Router Subscriber HTTPS Relaxation feature USERSTORYID:US674047. - * - * @param email - list of email ids to notify if HTTP relexcation is enabled. - * @author vs215k - **/ - private void notifyPSTeam(String email) { - loadMailProperties(); //Load HTTPS Relex mail properties. - String[] emails = email.split(Pattern.quote("|")); - - Properties mailproperties = new Properties(); - mailproperties.put("mail.smtp.host", mailprops.get("com.att.dmaap.datarouter.mail.server")); - mailproperties.put("mail.transport.protocol", mailprops.get("com.att.dmaap.datarouter.mail.protocol")); - - Session session = Session.getDefaultInstance(mailproperties, null); - Multipart mp = new MimeMultipart(); - MimeBodyPart htmlPart = new MimeBodyPart(); - - try { - - Message msg = new MimeMessage(session); - msg.setFrom(new InternetAddress(mailprops.get("com.att.dmaap.datarouter.mail.from").toString())); - - InternetAddress[] addressTo = new InternetAddress[emails.length]; - for (int x = 0; x < emails.length; x++) { - addressTo[x] = new InternetAddress(emails[x]); - } - - msg.addRecipients(Message.RecipientType.TO, addressTo); - msg.setSubject(mailprops.get("com.att.dmaap.datarouter.mail.subject").toString()); - htmlPart.setContent(mailprops.get("com.att.dmaap.datarouter.mail.body").toString() - .replace("[SERVER]", InetAddress.getLocalHost().getHostName()), "text/html"); - mp.addBodyPart(htmlPart); - msg.setContent(mp); - - intlogger.info(mailprops.get("com.att.dmaap.datarouter.mail.body").toString() - .replace("[SERVER]", InetAddress.getLocalHost().getHostName())); - - Transport.send(msg); - intlogger.info("HTTPS relaxation mail is sent to - : " + email); - - } catch (MessagingException e) { - intlogger.error("Invalid email address, unable to send https relaxation mail to - : " + email, e); - } catch (UnknownHostException uhe) { - intlogger.error("UnknownHostException", uhe); - } - } - public static String getProvName() { return provName; } - public static String getActiveProvName() { + static String getActiveProvName() { return activeProvName; } @@ -696,7 +636,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { * * @return an array of InetAddresses */ - public static InetAddress[] getNodeAddresses() { + private static InetAddress[] getNodeAddresses() { return nodeAddresses; } @@ -814,7 +754,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { private static boolean getBoolean(Map<String, String> map, String name) { String s = map.get(name); - return (s != null) && "true".equalsIgnoreCase(s); + return "true".equalsIgnoreCase(s); } private static String getString(Map<String, String> map, String name, String dflt) { @@ -854,7 +794,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { */ public class ContentHeader { - private String type = ""; + private String type; private Map<String, String> map = new HashMap<>(); ContentHeader() { @@ -870,7 +810,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { return type; } - public String getAttribute(String key) { + String getAttribute(String key) { String s = map.get(key); if (s == null) { s = ""; @@ -976,19 +916,17 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { /* * @Method - getGroupByFeedGroupId- Rally:US708115 * @Params - User to check in group and feedid which is assigned the group. - * @return - string value grupid/null + * @return - string value groupid/null */ @Override public String getGroupByFeedGroupId(String owner, String feedId) { try { - int n = Integer.parseInt(feedId); - Feed f = Feed.getFeedById(n); + Feed f = Feed.getFeedById(Integer.parseInt(feedId)); if (f != null) { int groupid = f.getGroupid(); if (groupid > 0) { Group group = Group.getGroupById(groupid); - assert group != null; - if (isUserMemberOfGroup(group, owner)) { + if (group != null && isUserMemberOfGroup(group, owner)) { return group.getAuthid(); } } @@ -1002,7 +940,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { /* * @Method - getGroupBySubGroupId - Rally:US708115 * @Params - User to check in group and subid which is assigned the group. - * @return - string value grupid/null + * @return - string value groupid/null */ @Override public String getGroupBySubGroupId(String owner, String subId) { @@ -1013,8 +951,7 @@ public class BaseServlet extends HttpServlet implements ProvDataProvider { int groupid = s.getGroupid(); if (groupid > 0) { Group group = Group.getGroupById(groupid); - assert group != null; - if (isUserMemberOfGroup(group, owner)) { + if (group != null && isUserMemberOfGroup(group, owner)) { return group.getAuthid(); } } diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Parameters.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Parameters.java index fff10ac7..357444e4 100644 --- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Parameters.java +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/beans/Parameters.java @@ -57,6 +57,7 @@ public class Parameters extends Syncable { public static final String PROV_POKETIMER2 = "PROV_POKETIMER2";
public static final String PROV_SPECIAL_SUBNET = "PROV_SPECIAL_SUBNET";
public static final String PROV_LOG_RETENTION = "PROV_LOG_RETENTION";
+ public static final String DEFAULT_LOG_RETENTION = "DEFAULT_LOG_RETENTION";
public static final String NODES = "NODES";
public static final String ACTIVE_POD = "ACTIVE_POD";
public static final String STANDBY_POD = "STANDBY_POD";
diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/HttpServletUtils.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/HttpServletUtils.java index d9f36de3..f59dc919 100644 --- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/HttpServletUtils.java +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/HttpServletUtils.java @@ -20,14 +20,19 @@ * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * * ******************************************************************************/ -package org.onap.dmaap.datarouter.provisioning.utils; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; +package org.onap.dmaap.datarouter.provisioning.utils; import com.att.eelf.configuration.EELFLogger; +import java.io.IOException; +import javax.servlet.http.HttpServletResponse; public class HttpServletUtils { + + private HttpServletUtils(){ + + } + public static void sendResponseError(HttpServletResponse response, int errorCode, String message, EELFLogger intlogger) { try { response.sendError(errorCode, message); diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java index 3ba1a151..c78a5b10 100644 --- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoader.java @@ -1,501 +1,498 @@ -/*******************************************************************************
- * ============LICENSE_START==================================================
- * * org.onap.dmaap
- * * ===========================================================================
- * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
- * * ===========================================================================
- * * Licensed under the Apache License, Version 2.0 (the "License");
- * * you may not use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- * * ============LICENSE_END====================================================
- * *
- * * ECOMP is a trademark and service mark of AT&T Intellectual Property.
- * *
- ******************************************************************************/
-
-
-package org.onap.dmaap.datarouter.provisioning.utils;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.LineNumberReader;
-import java.io.Reader;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.text.ParseException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.zip.GZIPInputStream;
-
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-import org.onap.dmaap.datarouter.provisioning.BaseServlet;
-import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord;
-import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord;
-import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord;
-import org.onap.dmaap.datarouter.provisioning.beans.Loadable;
-import org.onap.dmaap.datarouter.provisioning.beans.LogRecord;
-import org.onap.dmaap.datarouter.provisioning.beans.Parameters;
-import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord;
-import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord;
-
-/**
- * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir.
- * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be
- * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at
- * startup to load the old (1.0) style log tables into LOG_RECORDS;
- * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the
- * database.
- * This bit set is used to synchronize between provisioning servers.</p>
- *
- * @author Robert Eby
- * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $
- */
-public class LogfileLoader extends Thread {
- /**
- * Default number of log records to keep when pruning. Keep 10M by default.
- */
- public static final long DEFAULT_LOG_RETENTION = 10000000L;
- /**
- * NOT USED: Percentage of free space required before old records are removed.
- */
- public static final int REQUIRED_FREE_PCT = 20;
-
- /**
- * This is a singleton -- there is only one LogfileLoader object in the server
- */
- private static LogfileLoader logfileLoader;
-
- /**
- * Get the singleton LogfileLoader object, and start it if it is not running.
- *
- * @return the LogfileLoader
- */
- public static synchronized LogfileLoader getLoader() {
- if (logfileLoader == null)
- logfileLoader = new LogfileLoader();
- if (!logfileLoader.isAlive())
- logfileLoader.start();
- return logfileLoader;
- }
-
- /**
- * The PreparedStatement which is loaded by a <i>Loadable</i>.
- */
- public static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
- /**
- * Each server can assign this many IDs
- */
- private static final long SET_SIZE = (1L << 56);
-
- private final EELFLogger logger;
- private final DB db;
- private final String spooldir;
- private final long set_start;
- private final long set_end;
- private RLEBitSet seq_set;
- private long nextid;
- private boolean idle;
-
- private LogfileLoader() {
- this.logger = EELFManager.getInstance().getLogger("InternalLog");
- this.db = new DB();
- this.spooldir = db.getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir");
- this.set_start = getIdRange();
- this.set_end = set_start + SET_SIZE - 1;
- this.seq_set = new RLEBitSet();
- this.nextid = 0;
- this.idle = false;
-
- // This is a potentially lengthy operation, so has been moved to run()
- //initializeNextid();
- this.setDaemon(true);
- this.setName("LogfileLoader");
- }
-
- private long getIdRange() {
- long n;
- if (BaseServlet.isInitialActivePOD())
- n = 0;
- else if (BaseServlet.isInitialStandbyPOD())
- n = SET_SIZE;
- else
- n = SET_SIZE * 2;
- String r = String.format("[%X .. %X]", n, n + SET_SIZE - 1);
- logger.debug("This server shall assign RECORD_IDs in the range " + r);
- return n;
- }
-
- /**
- * Return the bit set representing the record ID's that are loaded in this database.
- *
- * @return the bit set
- */
- public RLEBitSet getBitSet() {
- return seq_set;
- }
-
- /**
- * True if the LogfileLoader is currently waiting for work.
- *
- * @return true if idle
- */
- public boolean isIdle() {
- return idle;
- }
-
- /**
- * Run continuously to look for new logfiles in the spool directory and import them into the DB.
- * The spool is checked once per second. If free space on the MariaDB filesystem falls below
- * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS
- * table is compacted until free space rises above the threshold.
- */
- @Override
- public void run() {
- initializeNextid(); // moved from the constructor
- while (true) {
- try {
- File dirfile = new File(spooldir);
- while (true) {
- // process IN files
- File[] infiles = dirfile.listFiles(new FilenameFilter() {
- @Override
- public boolean accept(File dir, String name) {
- return name.startsWith("IN.");
- }
- });
-
- if (infiles.length == 0) {
- idle = true;
- try {
- Thread.sleep(1000L);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- idle = false;
- } else {
- // Remove old rows
- if (pruneRecords()) {
- // Removed at least some entries, recompute the bit map
- initializeNextid();
- }
-
- // Process incoming logfiles
- for (File f : infiles) {
- if (logger.isDebugEnabled())
- logger.debug("PROV8001 Starting " + f + " ...");
- long time = System.currentTimeMillis();
- int[] n = process(f);
- time = System.currentTimeMillis() - time;
- logger.info(String
- .format("PROV8000 Processed %s in %d ms; %d of %d records.",
- f.toString(), time, n[0], n[1]));
- f.delete();
- }
- }
- }
- } catch (Exception e) {
- logger.warn("PROV0020: Caught exception in LogfileLoader: " + e);
- }
- }
- }
-
- boolean pruneRecords() {
- boolean did1 = false;
- long count = countRecords();
- long threshold = DEFAULT_LOG_RETENTION;
- Parameters param = Parameters.getParameter(Parameters.PROV_LOG_RETENTION);
- if (param != null) {
- try {
- long n = Long.parseLong(param.getValue());
- // This check is to prevent inadvertent errors from wiping the table out
- if (n > 1000000L)
- threshold = n;
- } catch (NumberFormatException e) {
- // ignore
- }
- }
- logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold);
- if (count > threshold) {
- count -= threshold; // we need to remove this many records;
- Map<Long, Long> hist = getHistogram(); // histogram of records per day
- // Determine the cutoff point to remove the needed number of records
- long sum = 0;
- long cutoff = 0;
- for (Long day : new TreeSet<Long>(hist.keySet())) {
- sum += hist.get(day);
- cutoff = day;
- if (sum >= count)
- break;
- }
- cutoff++;
- cutoff *= 86400000L; // convert day to ms
- logger.debug(" Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")");
-
- Connection conn = null;
- try {
- // Limit to a million at a time to avoid typing up the DB for too long.
- conn = db.getConnection();
- try(PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) {
- ps.setLong(1, cutoff);
- while (count > 0) {
- if (!ps.execute()) {
- int dcount = ps.getUpdateCount();
- count -= dcount;
- logger.debug(" " + dcount + " rows deleted.");
- did1 |= (dcount != 0);
- if (dcount == 0)
- count = 0; // prevent inf. loops
- } else {
- count = 0; // shouldn't happen!
- }
- }
- }
- try(Statement stmt = conn.createStatement()) {
- stmt.execute("OPTIMIZE TABLE LOG_RECORDS");
- }
- } catch (SQLException e) {
- System.err.println(e);
- logger.error(e.toString());
- } finally {
- db.release(conn);
- }
- }
- return did1;
- }
-
- long countRecords() {
- long count = 0;
- Connection conn = null;
- try {
- conn = db.getConnection();
- try(Statement stmt = conn.createStatement()) {
- try(ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS")) {
- if (rs.next()) {
- count = rs.getLong("COUNT");
- }
- }
- }
- } catch (SQLException e) {
- System.err.println(e);
- logger.error(e.toString());
- } finally {
- db.release(conn);
- }
- return count;
- }
-
- Map<Long, Long> getHistogram() {
- Map<Long, Long> map = new HashMap<Long, Long>();
- Connection conn = null;
- try {
- logger.debug(" LOG_RECORD table histogram...");
- conn = db.getConnection();
- try(Statement stmt = conn.createStatement()) {
- try(ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY")) {
- while (rs.next()) {
- long day = rs.getLong("DAY");
- long cnt = rs.getLong("COUNT");
- map.put(day, cnt);
- logger.debug(" " + day + " " + cnt);
- }
- }
- }
- } catch (SQLException e) {
- System.err.println(e);
- logger.error(e.toString());
- } finally {
- db.release(conn);
- }
- return map;
- }
-
- private void initializeNextid() {
- Connection conn = null;
- try {
- conn = db.getConnection();
- RLEBitSet nbs = new RLEBitSet();
- try(Statement stmt = conn.createStatement()) {
- // Build a bitset of all records in the LOG_RECORDS table
- // We need to run this SELECT in stages, because otherwise we run out of memory!
- final long stepsize = 6000000L;
- boolean go_again = true;
- for (long i = 0; go_again; i += stepsize) {
- String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize);
- try (ResultSet rs = stmt.executeQuery(sql)) {
- go_again = false;
- while (rs.next()) {
- long n = rs.getLong("RECORD_ID");
- nbs.set(n);
- go_again = true;
- }
- }
- }
- }
- seq_set = nbs;
- // Compare with the range for this server
- // Determine the next ID for this set of record IDs
- RLEBitSet tbs = (RLEBitSet) nbs.clone();
- RLEBitSet idset = new RLEBitSet();
- idset.set(set_start, set_start + SET_SIZE);
- tbs.and(idset);
- long t = tbs.length();
- nextid = (t == 0) ? set_start : (t - 1);
- if (nextid >= set_start + SET_SIZE) {
- // Handle wraparound, when the IDs reach the end of our "range"
- Long[] last = null;
- Iterator<Long[]> li = tbs.getRangeIterator();
- while (li.hasNext()) {
- last = li.next();
- }
- if (last != null) {
- tbs.clear(last[0], last[1] + 1);
- t = tbs.length();
- nextid = (t == 0) ? set_start : (t - 1);
- }
- }
- logger.debug(String.format("initializeNextid, next ID is %d (%x)", nextid, nextid));
- } catch (SQLException e) {
- System.err.println(e);
- logger.error(e.toString());
- } finally {
- db.release(conn);
- }
- }
-
- @SuppressWarnings("resource")
- int[] process(File f) {
- int ok = 0, total = 0;
- try {
- Connection conn = db.getConnection();
- PreparedStatement ps = conn.prepareStatement(INSERT_SQL);
- Reader r = f.getPath().endsWith(".gz")
- ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f)))
- : new FileReader(f);
- try(LineNumberReader in = new LineNumberReader(r)) {
- String line;
- while ((line = in.readLine()) != null) {
- try {
- for (Loadable rec : buildRecords(line)) {
- rec.load(ps);
- if (rec instanceof LogRecord) {
- LogRecord lr = ((LogRecord) rec);
- if (!seq_set.get(lr.getRecordId())) {
- ps.executeUpdate();
- seq_set.set(lr.getRecordId());
- } else
- logger.debug("Duplicate record ignored: " + lr.getRecordId());
- } else {
- if (++nextid > set_end)
- nextid = set_start;
- ps.setLong(18, nextid);
- ps.executeUpdate();
- seq_set.set(nextid);
- }
- ps.clearParameters();
- ok++;
- }
- } catch (SQLException e) {
- logger.warn("PROV8003 Invalid value in record: " + line);
- logger.debug(e.toString(), e);
- } catch (NumberFormatException e) {
- logger.warn("PROV8004 Invalid number in record: " + line);
- logger.debug(e.toString());
- } catch (ParseException e) {
- logger.warn("PROV8005 Invalid date in record: " + line);
- logger.debug(e.toString());
- } catch (Exception e) {
- logger.warn("PROV8006 Invalid pattern in record: " + line);
- logger.debug(e.toString(), e);
- }
- total++;
- }
- }
- ps.close();
- db.release(conn);
- conn = null;
- } catch (FileNotFoundException e) {
- logger.warn("PROV8007 Exception reading " + f + ": " + e);
- } catch (IOException e) {
- logger.warn("PROV8007 Exception reading " + f + ": " + e);
- } catch (SQLException e) {
- logger.warn("PROV8007 Exception reading " + f + ": " + e);
- }
- return new int[]{ok, total};
- }
-
- Loadable[] buildRecords(String line) throws ParseException {
- String[] pp = line.split("\\|");
- if (pp != null && pp.length >= 7) {
- String rtype = pp[1].toUpperCase();
- if (rtype.equals("PUB") && pp.length == 11) {
- // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status
- return new Loadable[]{new PublishRecord(pp)};
- }
- if (rtype.equals("DEL") && pp.length == 12) {
- // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid
- String[] subs = pp[4].split("\\s+");
- if (subs != null) {
- Loadable[] rv = new Loadable[subs.length];
- for (int i = 0; i < subs.length; i++) {
- // create a new record for each individual sub
- pp[4] = subs[i];
- rv[i] = new DeliveryRecord(pp);
- }
- return rv;
- }
- }
- if (rtype.equals("EXP") && pp.length == 11) {
- // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts
- ExpiryRecord e = new ExpiryRecord(pp);
- if (e.getReason().equals("other"))
- logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: " + e.getPublishId());
- return new Loadable[]{e};
- }
- if (rtype.equals("PBF") && pp.length == 12) {
- // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error
- return new Loadable[]{new PubFailRecord(pp)};
- }
- if (rtype.equals("DLX") && pp.length == 7) {
- // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent
- return new Loadable[]{new DeliveryExtraRecord(pp)};
- }
- if (rtype.equals("LOG") && (pp.length == 19 || pp.length == 20)) {
- // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id
- return new Loadable[]{new LogRecord(pp)};
- }
- }
- logger.warn("PROV8002 bad record: " + line);
- return new Loadable[0];
- }
-
- /**
- * The LogfileLoader can be run stand-alone by invoking the main() method of this class.
- *
- * @param a ignored
- * @throws InterruptedException
- */
- public static void main(String[] a) throws InterruptedException {
- LogfileLoader.getLoader();
- Thread.sleep(200000L);
- }
-}
+/******************************************************************************* + * ============LICENSE_START================================================== + * * org.onap.dmaap + * * =========================================================================== + * * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * * =========================================================================== + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * * ============LICENSE_END==================================================== + * * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * * + ******************************************************************************/ + + +package org.onap.dmaap.datarouter.provisioning.utils; + +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.io.Reader; +import java.nio.file.Files; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.ParseException; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeSet; +import java.util.zip.GZIPInputStream; +import org.onap.dmaap.datarouter.provisioning.BaseServlet; +import org.onap.dmaap.datarouter.provisioning.beans.DeliveryExtraRecord; +import org.onap.dmaap.datarouter.provisioning.beans.DeliveryRecord; +import org.onap.dmaap.datarouter.provisioning.beans.ExpiryRecord; +import org.onap.dmaap.datarouter.provisioning.beans.Loadable; +import org.onap.dmaap.datarouter.provisioning.beans.LogRecord; +import org.onap.dmaap.datarouter.provisioning.beans.Parameters; +import org.onap.dmaap.datarouter.provisioning.beans.PubFailRecord; +import org.onap.dmaap.datarouter.provisioning.beans.PublishRecord; + +/** + * This class provides methods that run in a separate thread, in order to process logfiles uploaded into the spooldir. + * These logfiles are loaded into the MariaDB LOG_RECORDS table. In a running provisioning server, there should only be + * two places where records can be loaded into this table; here, and in the method DB.retroFit4() which may be run at + * startup to load the old (1.0) style log tables into LOG_RECORDS; + * <p>This method maintains an {@link RLEBitSet} which can be used to easily see what records are presently in the + * database. + * This bit set is used to synchronize between provisioning servers.</p> + * + * @author Robert Eby + * @version $Id: LogfileLoader.java,v 1.22 2014/03/12 19:45:41 eby Exp $ + */ +public class LogfileLoader extends Thread { + /** + * NOT USED: Percentage of free space required before old records are removed. + */ + public static final int REQUIRED_FREE_PCT = 20; + + /** + * This is a singleton -- there is only one LogfileLoader object in the server. + */ + private static LogfileLoader logfileLoader; + + /** + * The PreparedStatement which is loaded by a <i>Loadable</i>. + */ + private static final String INSERT_SQL = "insert into LOG_RECORDS values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + /** + * Each server can assign this many IDs. + */ + private static final long SET_SIZE = (1L << 56); + + private final EELFLogger logger; + private final DB db; + private final String spooldir; + private final long setStart; + private final long setEnd; + private RLEBitSet seqSet; + private long nextId; + private boolean idle; + + /** + * Get the singleton LogfileLoader object, and start it if it is not running. + * + * @return the LogfileLoader + */ + public static synchronized LogfileLoader getLoader() { + if (logfileLoader == null) { + logfileLoader = new LogfileLoader(); + } + if (!logfileLoader.isAlive()) { + logfileLoader.start(); + } + return logfileLoader; + } + + + private LogfileLoader() { + this.logger = EELFManager.getInstance().getLogger("InternalLog"); + this.db = new DB(); + this.spooldir = db.getProperties().getProperty("org.onap.dmaap.datarouter.provserver.spooldir"); + this.setStart = getIdRange(); + this.setEnd = setStart + SET_SIZE - 1; + this.seqSet = new RLEBitSet(); + this.nextId = 0; + this.idle = false; + this.setDaemon(true); + this.setName("LogfileLoader"); + } + + private long getIdRange() { + long n; + if (BaseServlet.isInitialActivePOD()) { + n = 0; + } else if (BaseServlet.isInitialStandbyPOD()) { + n = SET_SIZE; + } else { + n = SET_SIZE * 2; + } + String r = String.format("[%X .. %X]", n, n + SET_SIZE - 1); + logger.debug("This server shall assign RECORD_IDs in the range " + r); + return n; + } + + /** + * Return the bit set representing the record ID's that are loaded in this database. + * + * @return the bit set + */ + public RLEBitSet getBitSet() { + return seqSet; + } + + /** + * True if the LogfileLoader is currently waiting for work. + * + * @return true if idle + */ + public boolean isIdle() { + return idle; + } + + /** + * Run continuously to look for new logfiles in the spool directory and import them into the DB. + * The spool is checked once per second. If free space on the MariaDB filesystem falls below + * REQUIRED_FREE_PCT (normally 20%) then the oldest logfile entries are removed and the LOG_RECORDS + * table is compacted until free space rises above the threshold. + */ + @Override + public void run() { + initializeNextid(); + while (true) { + try { + File dirfile = new File(spooldir); + while (true) { + runLogFileLoad(dirfile); + } + } catch (Exception e) { + logger.warn("PROV0020: Caught exception in LogfileLoader: " + e); + } + } + } + + private void runLogFileLoad(File filesDir) { + File[] inFiles = filesDir.listFiles((dir, name) -> name.startsWith("IN.")); + if (inFiles != null) { + if (inFiles.length == 0) { + idle = true; + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + idle = false; + } else { + // Remove old rows + if (pruneRecords()) { + // Removed at least some entries, recompute the bit map + initializeNextid(); + } + for (File file : inFiles) { + processFile(file); + } + } + } + } + + private void processFile(File infile) { + if (logger.isDebugEnabled()) { + logger.debug("PROV8001 Starting " + infile + " ..."); + } + long time = System.currentTimeMillis(); + int[] n = process(infile); + time = System.currentTimeMillis() - time; + logger.info(String.format("PROV8000 Processed %s in %d ms; %d of %d records.", + infile.toString(), time, n[0], n[1])); + try { + Files.delete(infile.toPath()); + } catch (IOException e) { + logger.info("PROV8001 failed to delete file " + infile.getName(), e); + } + } + + boolean pruneRecords() { + boolean did1 = false; + long count = countRecords(); + Parameters defaultLogRetention = Parameters.getParameter(Parameters.DEFAULT_LOG_RETENTION); + long threshold = (defaultLogRetention != null) ? Long.parseLong(defaultLogRetention.getValue()) : 1000000L; + Parameters provLogRetention = Parameters.getParameter(Parameters.PROV_LOG_RETENTION); + if (provLogRetention != null) { + try { + long n = Long.parseLong(provLogRetention.getValue()); + // This check is to prevent inadvertent errors from wiping the table out + if (n > 1000000L) { + threshold = n; + } + } catch (NumberFormatException e) { + // ignore + } + } + logger.debug("Pruning LOG_RECORD table: records in DB=" + count + ", threshold=" + threshold); + if (count > threshold) { + // we need to remove this many records + count -= threshold; + // histogram of records per day + Map<Long, Long> hist = getHistogram(); + // Determine the cutoff point to remove the needed number of records + long sum = 0; + long cutoff = 0; + for (Long day : new TreeSet<>(hist.keySet())) { + sum += hist.get(day); + cutoff = day; + if (sum >= count) { + break; + } + } + cutoff++; + // convert day to ms + cutoff *= 86400000L; + logger.debug(" Pruning records older than=" + (cutoff / 86400000L) + " (" + new Date(cutoff) + ")"); + + Connection conn = null; + try { + // Limit to a million at a time to avoid typing up the DB for too long. + conn = db.getConnection(); + try (PreparedStatement ps = conn.prepareStatement("DELETE from LOG_RECORDS where EVENT_TIME < ? limit 1000000")) { + ps.setLong(1, cutoff); + while (count > 0) { + if (!ps.execute()) { + int dcount = ps.getUpdateCount(); + count -= dcount; + logger.debug(" " + dcount + " rows deleted."); + did1 |= (dcount != 0); + if (dcount == 0) { + count = 0; // prevent inf. loops + } + } else { + count = 0; // shouldn't happen! + } + } + } + try (Statement stmt = conn.createStatement()) { + stmt.execute("OPTIMIZE TABLE LOG_RECORDS"); + } + } catch (SQLException e) { + logger.error(e.toString()); + } finally { + db.release(conn); + } + } + return did1; + } + + long countRecords() { + long count = 0; + Connection conn = null; + try { + conn = db.getConnection(); + try (Statement stmt = conn.createStatement()) { + try (ResultSet rs = stmt.executeQuery("SELECT COUNT(*) as COUNT from LOG_RECORDS")) { + if (rs.next()) { + count = rs.getLong("COUNT"); + } + } + } + } catch (SQLException e) { + logger.error(e.toString()); + } finally { + db.release(conn); + } + return count; + } + + Map<Long, Long> getHistogram() { + Map<Long, Long> map = new HashMap<>(); + Connection conn = null; + try { + logger.debug(" LOG_RECORD table histogram..."); + conn = db.getConnection(); + try (Statement stmt = conn.createStatement()) { + try (ResultSet rs = stmt.executeQuery("SELECT FLOOR(EVENT_TIME/86400000) AS DAY, COUNT(*) AS COUNT FROM LOG_RECORDS GROUP BY DAY")) { + while (rs.next()) { + long day = rs.getLong("DAY"); + long cnt = rs.getLong("COUNT"); + map.put(day, cnt); + logger.debug(" " + day + " " + cnt); + } + } + } + } catch (SQLException e) { + logger.error(e.toString()); + } finally { + db.release(conn); + } + return map; + } + + private void initializeNextid() { + Connection conn = null; + try { + conn = db.getConnection(); + RLEBitSet nbs = new RLEBitSet(); + try (Statement stmt = conn.createStatement()) { + // Build a bitset of all records in the LOG_RECORDS table + // We need to run this SELECT in stages, because otherwise we run out of memory! + final long stepsize = 6000000L; + boolean goAgain = true; + for (long i = 0; goAgain; i += stepsize) { + String sql = String.format("select RECORD_ID from LOG_RECORDS LIMIT %d,%d", i, stepsize); + try (ResultSet rs = stmt.executeQuery(sql)) { + goAgain = false; + while (rs.next()) { + long n = rs.getLong("RECORD_ID"); + nbs.set(n); + goAgain = true; + } + } + } + } + seqSet = nbs; + // Compare with the range for this server + // Determine the next ID for this set of record IDs + RLEBitSet tbs = (RLEBitSet) nbs.clone(); + RLEBitSet idset = new RLEBitSet(); + idset.set(setStart, setStart + SET_SIZE); + tbs.and(idset); + long t = tbs.length(); + nextId = (t == 0) ? setStart : (t - 1); + if (nextId >= setStart + SET_SIZE) { + // Handle wraparound, when the IDs reach the end of our "range" + Long[] last = null; + Iterator<Long[]> li = tbs.getRangeIterator(); + while (li.hasNext()) { + last = li.next(); + } + if (last != null) { + tbs.clear(last[0], last[1] + 1); + t = tbs.length(); + nextId = (t == 0) ? setStart : (t - 1); + } + } + logger.debug(String.format("initializeNextid, next ID is %d (%x)", nextId, nextId)); + } catch (SQLException e) { + logger.error(e.toString()); + } finally { + db.release(conn); + } + } + + @SuppressWarnings("resource") + int[] process(File f) { + int ok = 0; + int total = 0; + try { + Connection conn = db.getConnection(); + PreparedStatement ps = conn.prepareStatement(INSERT_SQL); + Reader r = f.getPath().endsWith(".gz") + ? new InputStreamReader(new GZIPInputStream(new FileInputStream(f))) + : new FileReader(f); + try (LineNumberReader in = new LineNumberReader(r)) { + String line; + while ((line = in.readLine()) != null) { + try { + for (Loadable rec : buildRecords(line)) { + rec.load(ps); + if (rec instanceof LogRecord) { + LogRecord lr = ((LogRecord) rec); + if (!seqSet.get(lr.getRecordId())) { + ps.executeUpdate(); + seqSet.set(lr.getRecordId()); + } else { + logger.debug("Duplicate record ignored: " + lr.getRecordId()); + } + } else { + if (++nextId > setEnd) { + nextId = setStart; + } + ps.setLong(18, nextId); + ps.executeUpdate(); + seqSet.set(nextId); + } + ps.clearParameters(); + ok++; + } + } catch (SQLException e) { + logger.warn("PROV8003 Invalid value in record: " + line, e); + } catch (NumberFormatException e) { + logger.warn("PROV8004 Invalid number in record: " + line, e); + } catch (ParseException e) { + logger.warn("PROV8005 Invalid date in record: " + line, e); + } catch (Exception e) { + logger.warn("PROV8006 Invalid pattern in record: " + line, e); + } + total++; + } + } + ps.close(); + db.release(conn); + } catch (SQLException | IOException e) { + logger.warn("PROV8007 Exception reading " + f + ": " + e); + } + return new int[]{ok, total}; + } + + Loadable[] buildRecords(String line) throws ParseException { + String[] pp = line.split("\\|"); + if (pp != null && pp.length >= 7) { + String rtype = pp[1].toUpperCase(); + if ("PUB".equals(rtype) && pp.length == 11) { + // Fields are: date|PUB|pubid|feedid|requrl|method|ctype|clen|srcip|user|status + return new Loadable[]{new PublishRecord(pp)}; + } + if ("DEL".equals(rtype) && pp.length == 12) { + // Fields are: date|DEL|pubid|feedid|subid|requrl|method|ctype|clen|user|status|xpubid + String[] subs = pp[4].split("\\s+"); + if (subs != null) { + Loadable[] rv = new Loadable[subs.length]; + for (int i = 0; i < subs.length; i++) { + // create a new record for each individual sub + pp[4] = subs[i]; + rv[i] = new DeliveryRecord(pp); + } + return rv; + } + } + if ("EXP".equals(rtype) && pp.length == 11) { + // Fields are: date|EXP|pubid|feedid|subid|requrl|method|ctype|clen|reason|attempts + ExpiryRecord e = new ExpiryRecord(pp); + if ("other".equals(e.getReason())) { + logger.info("Invalid reason '" + pp[9] + "' changed to 'other' for record: " + e.getPublishId()); + } + return new Loadable[]{e}; + } + if ("PBF".equals(rtype) && pp.length == 12) { + // Fields are: date|PBF|pubid|feedid|requrl|method|ctype|clen-expected|clen-received|srcip|user|error + return new Loadable[]{new PubFailRecord(pp)}; + } + if ("DLX".equals(rtype) && pp.length == 7) { + // Fields are: date|DLX|pubid|feedid|subid|clen-tosend|clen-sent + return new Loadable[]{new DeliveryExtraRecord(pp)}; + } + if ("LOG".equals(rtype) && (pp.length == 19 || pp.length == 20)) { + // Fields are: date|LOG|pubid|feedid|requrl|method|ctype|clen|type|feedFileid|remoteAddr|user|status|subid|fileid|result|attempts|reason|record_id + return new Loadable[]{new LogRecord(pp)}; + } + } + logger.warn("PROV8002 bad record: " + line); + return new Loadable[0]; + } + + /** + * The LogfileLoader can be run stand-alone by invoking the main() method of this class. + * + * @param a ignored + */ + public static void main(String[] a) throws InterruptedException { + LogfileLoader.getLoader(); + Thread.sleep(200000L); + } +} diff --git a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/PasswordProcessor.java b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/PasswordProcessor.java index 44142031..cb6881fb 100644 --- a/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/PasswordProcessor.java +++ b/datarouter-prov/src/main/java/org/onap/dmaap/datarouter/provisioning/utils/PasswordProcessor.java @@ -21,14 +21,15 @@ package org.onap.dmaap.datarouter.provisioning.utils;
+import java.nio.charset.StandardCharsets;
+import java.security.GeneralSecurityException;
+import java.util.Base64;
+
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.PBEKeySpec;
import javax.crypto.spec.PBEParameterSpec;
-import java.nio.charset.StandardCharsets;
-import java.security.GeneralSecurityException;
-import java.util.Base64;
/**
* The Processing of a Password. Password can be encrypted and decrypted.
@@ -37,13 +38,14 @@ import java.util.Base64; */
public class PasswordProcessor {
- private PasswordProcessor(){}
-
private static final String SECRET_KEY_FACTORY_TYPE = "PBEWithMD5AndDES";
private static final String PASSWORD_ENCRYPTION_STRING = (new DB()).getProperties().getProperty("org.onap.dmaap.datarouter.provserver.passwordencryption");
private static final char[] PASSWORD = PASSWORD_ENCRYPTION_STRING.toCharArray();
private static final byte[] SALT = {(byte) 0xde, (byte) 0x33, (byte) 0x10, (byte) 0x12, (byte) 0xde, (byte) 0x33, (byte) 0x10, (byte) 0x12,};
+ private PasswordProcessor(){
+ }
+
/**
* Encrypt password.
* @param property the Password
|