diff options
author | Instrumental <jonathan.gathman@att.com> | 2019-04-13 08:27:16 -0500 |
---|---|---|
committer | Instrumental <jonathan.gathman@att.com> | 2019-04-13 09:35:19 -0500 |
commit | 66424b306877435b7e71e119a8d1498b4b263719 (patch) | |
tree | 2b13bc6c9968fb7e09e748f979ba4799bbfa2fe1 /auth/auth-batch/src/main/java | |
parent | d087d3ddd2829ced1d2ffccc5147c377cbcc92cb (diff) |
Improve Upload Batch
Issue-ID: AAF-811
Change-Id: Ia2289c764d0c5a14627d58349397a121f1960703
Signed-off-by: Instrumental <jonathan.gathman@att.com>
Diffstat (limited to 'auth/auth-batch/src/main/java')
9 files changed, 213 insertions, 245 deletions
diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/BatchDataView.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/BatchDataView.java index 83945ee6..9f269d0d 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/BatchDataView.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/BatchDataView.java @@ -113,7 +113,6 @@ public class BatchDataView implements DataView { @Override public Result<FutureDAO.Data> delete(AuthzTrans trans, FutureDAO.Data fdd) { - cqlBatch.preLoop(); StringBuilder sb = cqlBatch.inc(); sb.append("DELETE from authz.future WHERE id = "); sb.append(fdd.id.toString()); @@ -122,7 +121,6 @@ public class BatchDataView implements DataView { @Override public Result<ApprovalDAO.Data> delete(AuthzTrans trans, ApprovalDAO.Data add) { - cqlBatch.preLoop(); StringBuilder sb = cqlBatch.inc(); sb.append("DELETE from authz.approval WHERE id = "); sb.append(add.id.toString()); @@ -132,7 +130,6 @@ public class BatchDataView implements DataView { @Override public Result<ApprovalDAO.Data> insert(AuthzTrans trans, ApprovalDAO.Data add) { - cqlBatch.preLoop(); StringBuilder sb = cqlBatch.inc(); sb.append("INSERT INTO authz.approval (id,approver,memo,operation,status,ticket,type,user) VALUES ("); sb.append(add.id.toString()); @@ -158,7 +155,6 @@ public class BatchDataView implements DataView { @Override public Result<FutureDAO.Data> insert(AuthzTrans trans, FutureDAO.Data fdd) { - cqlBatch.preLoop(); StringBuilder sb = cqlBatch.inc(); sb.append("INSERT INTO authz.future (id,construct,expires,memo,start,target,target_key,target_date) VALUES ("); sb.append(fdd.id.toString()); diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java index 9f685ad4..4547fb1b 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java @@ -73,6 +73,14 @@ public class CQLBatch { return rv; } + public ResultSet singleExec(StringBuilder query, boolean dryRun) { + if(dryRun) { + return null; + } else { + return session.execute(query.toString()); + } + } + public void touch(String table, int begin, int end, boolean dryRun) { StringBuilder sb = begin(); for(int i=begin;i<end;++i) { diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java index 945fe0b3..2836d041 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java @@ -21,62 +21,98 @@ package org.onap.aaf.auth.batch.helpers; public class CQLBatchLoop { + private static final int MAX_CHARS = (50 * 1024)/2; private final CQLBatch cqlBatch; private final int maxBatch; private final StringBuilder sb; private final boolean dryRun; private int i; - private int count; + private int total; private int batches; + private final StringBuilder current; + private boolean showProgress; public CQLBatchLoop(CQLBatch cb, int max, boolean dryRun) { cqlBatch = cb; i=0; - count = 0; + total = 0; maxBatch = max; sb = cqlBatch.begin(); + current = new StringBuilder(); this.dryRun = dryRun; + showProgress = false; } - /** - * Put at the first part of your Loop Logic... It checks if you have enough lines to - * push a batch. - */ - public void preLoop() { - if(i<0) { - cqlBatch.begin(); - } else if(i>=maxBatch || sb.length()>24000) { - cqlBatch.execute(dryRun); - cqlBatch.begin(); - i=0; - ++batches; - } + public CQLBatchLoop showProgress() { + showProgress = true; + return this; } - /** * Assume this is another line in the Batch * @return */ public StringBuilder inc() { + if(i>=maxBatch || current.length()+sb.length()>MAX_CHARS) { + if(i>0) { + cqlBatch.execute(dryRun); + i = -1; + incBatch(); + } + } + if(i<0) { + cqlBatch.begin(); + i=0; + } + if(current.length() > MAX_CHARS) { + cqlBatch.singleExec(current, dryRun); + } else { + sb.append(current); + } + current.setLength(0); ++i; - ++count; - return sb; + ++total; + return current; } /** - * Close up when done. However, can go back to "preLoop" safely. + * Close up when finished. */ public void flush() { - if(i>0) { + if(current.length()+sb.length()>MAX_CHARS) { + if(i>0) { + cqlBatch.execute(dryRun); + incBatch(); + } + if(current.length()>0) { + cqlBatch.singleExec(current, dryRun); + current.setLength(0); + incBatch(); + } + } else { + if(i<0) { + cqlBatch.begin(); + } + sb.append(current); + current.setLength(0); cqlBatch.execute(dryRun); - ++batches; + incBatch(); } i=-1; } + private void incBatch() { + ++batches; + if(showProgress) { + System.out.print('.'); + if(batches%70==0) { + System.out.println(); + } + } + } + public int total() { - return count; + return total; } public int batches() { @@ -84,8 +120,12 @@ public class CQLBatchLoop { } public void reset() { - count = 0; + total = 0; batches = 0; i = -1; } + + public String toString() { + return cqlBatch.toString(); + } } diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/LastNotified.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/LastNotified.java index 0120ba40..e6942f09 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/LastNotified.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/LastNotified.java @@ -47,7 +47,7 @@ import com.datastax.driver.core.Statement; public class LastNotified { private Map<String,Date> lastNotified = new TreeMap<>(); private Session session; - private static final Date never = new Date(0); + public static final Date NEVER = new Date(0); private static final String SELECT = "SELECT user,target,key,last FROM authz.notified"; public LastNotified(Session session) { @@ -88,12 +88,13 @@ public class LastNotified { * @return */ public Date lastNotified(String user, String target, String targetkey) { - String key = user + '|' + target + '|' + targetkey; + String key = user + '|' + target + '|' + (targetkey==null?"":targetkey); return lastNotified(key); } public Date lastNotified(String key) { - return lastNotified.computeIfAbsent(key, k -> never); + Date d = lastNotified.get(key); + return d==null?NEVER:d; } private Date add(ResultSet result, Map<String, Date> lastNotified, MarkDelete md) { diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/reports/Analyze.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/reports/Analyze.java index a8ec8268..51400f87 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/reports/Analyze.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/reports/Analyze.java @@ -263,7 +263,7 @@ public class Analyze extends Batch { approver = appr.getApprover(); Pending n = pendingTemp.get(approver); if(n==null) { - Date lastNotified = ln.lastNotified(approver,"ur",ticket.f.fdd.target_key); + Date lastNotified = ln.lastNotified(approver,"pending",null); pendingTemp.put(approver,new Pending(lastNotified)); } else { n.inc(); @@ -332,7 +332,7 @@ public class Analyze extends Batch { for(Entry<String, Pending> es : pendingApprs.entrySet()) { Pending p = es.getValue(); if(p.newApprovals() - || p.earliest() == null + || p.earliest() == LastNotified.NEVER // yes, equals. || p.earliest().after(remind)) { p.row(needApproveCW,es.getKey()); } diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/reports/Notify.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/reports/Notify.java index cc6a611e..7cb8c1bd 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/reports/Notify.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/reports/Notify.java @@ -310,7 +310,6 @@ import org.onap.aaf.misc.env.util.Chrono; npab.store(rs.asList()); if(notify(noAvg, npab)>0) { // Update - cbl.preLoop(); // lastN.update(cbl.inc(),key,"pending",""); npab.record(trans,cbl.inc(), id, idList, lastN); npab.inc(); @@ -370,7 +369,6 @@ import org.onap.aaf.misc.env.util.Chrono; content.append(footer); if(mailer.sendEmail(trans, test, toList, ccList, nb.subject(),content.toString(), urgent)) { - cbl.preLoop(); nb.record(trans,cbl.inc(), id, idList, lastN); nb.inc(); } else { diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/temp/DataMigrateDublin.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/temp/DataMigrateDublin.java index bcd727ac..35970f50 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/temp/DataMigrateDublin.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/temp/DataMigrateDublin.java @@ -110,7 +110,6 @@ public class DataMigrateDublin extends Batch { } if(!btag.equals(tag)) { - cbl.preLoop(); update(cbl,row,btag); } break; @@ -150,7 +149,6 @@ public class DataMigrateDublin extends Batch { X509Certificate xc = (X509Certificate)c; for(CredInfo ci : list) { if(xc.getNotAfter().equals(ci.expires)) { - cbl.preLoop(); ci.update(cbl, ca + '|' + xc.getSerialNumber()); break; } diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Remove.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Remove.java index 51cfed01..b6b16fe3 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Remove.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Remove.java @@ -124,7 +124,6 @@ public class Remove extends Batch { CSV removeCSV = new CSV(env.access(),f); try { removeCSV.visit( row -> { - cbl.preLoop(); switch(row.get(0)) { case "info": switch(row.get(1)) { diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java index a3d37a0e..537cd932 100644 --- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java +++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java @@ -22,73 +22,33 @@ package org.onap.aaf.auth.batch.update; -import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.onap.aaf.auth.batch.Batch; import org.onap.aaf.auth.batch.helpers.CQLBatch; import org.onap.aaf.auth.batch.helpers.CQLBatchLoop; import org.onap.aaf.auth.env.AuthzTrans; import org.onap.aaf.auth.org.OrganizationException; +import org.onap.aaf.cadi.util.CSV; import org.onap.aaf.misc.env.APIException; import org.onap.aaf.misc.env.Env; import org.onap.aaf.misc.env.LogTarget; import org.onap.aaf.misc.env.TimeTaken; +import org.onap.aaf.misc.env.util.Split; public class Upload extends Batch { - private CQLBatchLoop cqlBatch; + private static final String DAT = ".dat"; - // APPROVALS - private static final String APPR_INS_FMT=" INSERT INTO authz.approval " - + "(id,approver,last_notified,memo,operation,status,ticket,type,user) " - + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s);\n"; - private static final Boolean[] APPR_QUOTES = new Boolean[]{false,true,true,true,true,true,false,true,true}; + private CQLBatch cqlBatch; - // ARTIFACTS - private static final String ARTI_INS_FMT=" INSERT INTO authz.artifact " - + "(mechid,machine,ca,dir,expires,notify,ns,os_user,renewdays,sans,sponsor,type) " - + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);\n"; - private static final Boolean[] ARTI_QUOTES = new Boolean[] - {true,true,true,true,true,true,true,true,false,false,true,false}; + private Map<String,Feed> feeds; - // CREDS - private static final String CRED_INS_FMT=" INSERT INTO authz.cred " - + "(id,type,expires,cred,notes,ns,other,prev) " - + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s);\n"; - private static final Boolean[] CRED_QUOTES = new Boolean[] - {true,false,true,false,true,true,false,false}; - - // NS - private static final String NS_INS_FMT=" INSERT INTO authz.ns " - + "(name,description,parent,scope,type) " - + "VALUES (%s,%s,%s,%s,%s);\n"; - private static final Boolean[] NS_QUOTES = new Boolean[] - {true,true,true,false,false}; - - // x509 - private static final String X509_INS_FMT=" INSERT INTO authz.x509 " - + "(ca,serial,id,x500,x509) " - + "VALUES (%s,%s,%s,%s,%s);\n"; - private static final Boolean[] X509_QUOTES = new Boolean[] - {true,false,true,true,true}; - - // ROLE - private static final String ROLE_INS_FMT=" INSERT INTO authz.role " - + "(ns,name,description,perms) " - + "VALUES (%s,%s,%s,%s);\n"; - private static final Boolean[] ROLE_QUOTES = new Boolean[] - {true,true,true,false}; - // ROLE - private static final String PERM_INS_FMT=" INSERT INTO authz.perm " - + "(ns,type,instance,action,description,roles) " - + "VALUES (%s,%s,%s,%s,%s,%s);\n"; - private static final Boolean[] PERM_QUOTES = new Boolean[] - {true,true,true,true,true,false}; public Upload(AuthzTrans trans) throws APIException, IOException, OrganizationException { super(trans.env()); @@ -103,192 +63,160 @@ public class Upload extends Batch { tt.done(); } - cqlBatch = new CQLBatchLoop(new CQLBatch(LogTarget.NULL,session),50,dryRun); + cqlBatch = new CQLBatch(LogTarget.NULL,session); + + feeds=new HashMap<>(); + new Feed(feeds,"ns",1,"name,description,parent,scope=int,type=int",300); + new Feed(feeds,"notified",3,"user,target,key,last",300); + new Feed(feeds,"approval",1,"id=UUID,approver,last_notified,memo,operation,status,ticket=UUID,type,user",200); + new Feed(feeds,"artifact",2,"mechid,machine,ca,dir,expires,notify,ns,os_user,renewdays=int,sans=set,sponsor,type=set",200); + new Feed(feeds,"cred",1,"id,type=int,expires,cred=blob,notes,ns,other=int,prev=blob,tag",200); + new Feed(feeds,"x509",2,"ca,serial=blob,id,x500,x509=C/R",200); + new Feed(feeds,"role",2,"ns,name,description,perms=set",200); + new Feed(feeds,"perm",4,"ns,type,instance,action,description,roles=set",200); + new Feed(feeds,"history",1,"id=UUID,action,memo,reconstruct=blob,subject,target,user,yr_mon=int",300); + } finally { tt0.done(); } } + @Override protected void run(AuthzTrans trans) { - String line; - StringBuilder sb = new StringBuilder(); - List<String> array = new ArrayList<String>(); - for(String feed : args()) { - File file; - if(feed.endsWith(".dat")) { - file = new File(feed); - feed = file.getName(); - feed = feed.substring(0,feed.length()-4); + List<File> files = new ArrayList<>(); + if(args().length>0) { + File dir = new File(args()[0]); + if(dir.isDirectory()) { + for(File f : dir.listFiles(pathname -> { + return pathname.getName().endsWith(DAT); + })) { + files.add(f); + } } else { - file = new File(feed+".dat"); - } - TimeTaken tt = trans.start(file.getAbsolutePath(), Env.SUB); - String msg = String.format("#### Running %s.dat Feed ####",feed); - trans.info().log(msg); - System.out.println(msg); - BufferedReader br = null; - try { - if(file.exists()) { - try { - br = new BufferedReader(new FileReader(file)); - try { - while((line=br.readLine())!=null) { - if(line.length()>5000) { - cqlBatch.flush(); - } - cqlBatch.preLoop(); - - // Split into fields, first turning Escaped values into something we can convert back from - char c=0; - boolean inQuote = false; - int fldcnt = 0; - for(int i=0;i<line.length();++i) { - switch(c=line.charAt(i)) { - case '"': - inQuote = !inQuote; - break; - case '|': - if(inQuote) { - sb.append(c); - } else { - addField(feed,fldcnt++,array,sb); - } - break; - default: - sb.append(c); - } - } - addField(feed,fldcnt,array,sb); - cqlBatch.inc().append(build(feed, array)); - } - cqlBatch.flush(); - } catch (Exception t) { - trans.error().log(t); - } finally { - br.close(); - } - } catch (IOException e) { - trans.error().log(e); + File f; + for(String arg : args()) { + if(arg.endsWith(DAT)) { + f=new File(arg); + } else { + f=new File(arg+DAT); } - } else { - trans.error().log("No file found: ", file.getAbsolutePath()); + files.add(f); } - } finally { - tt.done(); - System.err.flush(); - msg = String.format("\n%d applied in %d batches\n",cqlBatch.total(), cqlBatch.batches()); + } + } + for(File file : files) { + String f = file.getName(); + final Feed feed = feeds.get(f.substring(0,f.length()-4)); + if(feed!=null) { + TimeTaken tt = trans.start(file.getAbsolutePath(), Env.SUB); + String msg = String.format("#### Running %s.dat Feed ####",feed.getName()); trans.info().log(msg); System.out.println(msg); + CQLBatchLoop cbl = new CQLBatchLoop(cqlBatch,feed.batchSize,dryRun).showProgress(); + + try { + if(file.exists()) { + CSV csv = new CSV(trans.env().access(),file).setDelimiter('|'); + csv.visit( row -> { + feed.insert(cbl.inc(),row); + }); + } + cbl.flush(); + } catch (Throwable e) { + e.printStackTrace(); + } finally { + tt.done(); + System.err.flush(); + msg = String.format("\n%d applied in %d batches\n",cbl.total(), cbl.batches()); + trans.info().log(msg); + System.out.println(msg); + } } } - + } + + @Override + protected void _close(AuthzTrans trans) { + session.close(); } - - private String build(String feed, List<String> array) { - String rv; - if(array.size()>0) { - switch(feed) { - case "approval": - rv = String.format(APPR_INS_FMT,array.toArray()); - break; - case "artifact": - rv = String.format(ARTI_INS_FMT,array.toArray()); - break; - case "cred": - rv = String.format(CRED_INS_FMT,array.toArray()); - break; - case "ns": - rv = String.format(NS_INS_FMT,array.toArray()); - break; - case "role": - rv = String.format(ROLE_INS_FMT,array.toArray()); - break; - case "perm": - rv = String.format(PERM_INS_FMT,array.toArray()); - break; - case "x509": - rv = String.format(X509_INS_FMT,array.toArray()); - break; - default: - rv = ""; + private class Feed { + private final String name; + private final String[] flds; + private final String[] types; + private final int key; + private final int batchSize; + public Feed(Map<String, Feed> feeds, String feed, int keyLength, String fields,int batchSize) { + name=feed; + key = keyLength; + flds = Split.splitTrim(',', fields); + types = new String[flds.length]; + this.batchSize = batchSize; + int equals; + for(int i=0;i<flds.length;++i) { + if((equals = flds[i].indexOf('='))>0) { + types[i]=flds[i].substring(equals+1); + flds[i]=flds[i].substring(0, equals); + } } - array.clear(); - } else { - rv = ""; + feeds.put(feed,this); } - return rv; - } - - private void addField(String feed, int fldcnt, List<String> array, StringBuilder sb) { - Boolean[] ba; - switch(feed) { - case "approval": - ba = APPR_QUOTES; - break; - case "artifact": - ba = ARTI_QUOTES; - break; - case "cred": - ba = CRED_QUOTES; - break; - case "ns": - ba = NS_QUOTES; - break; - case "role": - ba = ROLE_QUOTES; - break; - case "perm": - ba = PERM_QUOTES; - break; - case "x509": - ba = X509_QUOTES; - break; - default: - ba = null; + + public String getName() { + return name; } - if(ba!=null) { - if(sb.toString().length()==0) { - array.add("null"); - } else { - if(ba[fldcnt]) { - String s = null; - if(sb.indexOf("'")>=0) { - s = sb.toString().replace("'","''"); - } - if(sb.indexOf("\\n")>=0) { - if(s==null) { - s = sb.toString().replace("\\n","\n"); - } else { - s = s.replace("\\n","\n"); - } + + public void insert(StringBuilder sb,List<String> row) { + sb.append("INSERT INTO authz."); + sb.append(name); + sb.append(" ("); + boolean first = true; + StringBuilder values = new StringBuilder(") VALUES ("); + String value; + String type; + for(int idx=0;idx<row.size();++idx) { + value = row.get(idx).trim(); + if(idx<key || !(value.isEmpty() || "null".equals(value))) { + if(first) { + first = false; + } else { + sb.append(','); + values.append(','); } - if(sb.indexOf("\\t")>=0) { - if(s==null) { - s = sb.toString().replace("\\t","\t"); - } else { - s = s.replace("\\t","\t"); + sb.append(flds[idx]); + type=types[idx]; + if(type==null) { // String is default. + switch(value) { + case "": + if(idx<key) { + // Key value has to be something, but can't be actual null + values.append("''"); + } else { + values.append("null"); + } + break; + default: + values.append('\''); + values.append(value.replaceAll("'","''")); + values.append('\''); } + } else switch(type) { + case "C/R": + values.append('\''); + values.append(value.replaceAll("\\\\n", "\n")); + values.append('\''); + break; + default: + values.append(value); + break; + } - if(s==null) { - array.add("'" + sb + '\''); - } else { - array.add("'" + s + '\''); - } - } else { - array.add(sb.toString()); } } - sb.setLength(0); + sb.append(values); + sb.append(");\n"); } } - - - @Override - protected void _close(AuthzTrans trans) { - session.close(); - } - } |