diff options
6 files changed, 241 insertions, 57 deletions
diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java index abdfa718..bef8dab2 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryQueue.java @@ -72,7 +72,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { private boolean failed; private long failduration; private long resumetime; - File dir; + private File dir; private Vector<DeliveryTask> todo = new Vector<>(); /** @@ -80,7 +80,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { * * @return The length of the task in bytes or 0 if the task cannot be cancelled. */ - public synchronized long cancelTask(String pubid) { + synchronized long cancelTask(String pubid) { if (working.get(pubid) != null) { return (0); } @@ -111,7 +111,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Mark that a delivery task has succeeded. */ - public synchronized void markSuccess(DeliveryTask task) { + private synchronized void markSuccess(DeliveryTask task) { working.remove(task.getPublishId()); task.clean(); failed = false; @@ -121,14 +121,14 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Mark that a delivery task has expired. */ - public synchronized void markExpired(DeliveryTask task) { + private synchronized void markExpired(DeliveryTask task) { task.clean(); } /** * Mark that a delivery task has failed permanently. */ - public synchronized void markFailNoRetry(DeliveryTask task) { + private synchronized void markFailNoRetry(DeliveryTask task) { working.remove(task.getPublishId()); task.clean(); failed = false; @@ -157,7 +157,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Mark that a delivery task has been redirected. */ - public synchronized void markRedirect(DeliveryTask task) { + private synchronized void markRedirect(DeliveryTask task) { working.remove(task.getPublishId()); retry.put(task.getPublishId(), task); } @@ -165,7 +165,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Mark that a delivery task has temporarily failed. */ - public synchronized void markFailWithRetry(DeliveryTask task) { + private synchronized void markFailWithRetry(DeliveryTask task) { working.remove(task.getPublishId()); retry.put(task.getPublishId(), task); fdupdate(); @@ -174,7 +174,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Get the next task. */ - public synchronized DeliveryTask getNext() { + synchronized DeliveryTask getNext() { DeliveryTask ret = peekNext(); if (ret != null) { todoindex++; @@ -186,7 +186,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Peek at the next task. */ - public synchronized DeliveryTask peekNext() { + synchronized DeliveryTask peekNext() { long now = System.currentTimeMillis(); long mindate = now - deliveryQueueHelper.getExpirationTimer(); if (failed) { @@ -199,7 +199,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { while (true) { if (todoindex >= todo.size()) { todoindex = 0; - todo = new Vector<DeliveryTask>(); + todo = new Vector<>(); String[] files = dir.list(); Arrays.sort(files); for (String fname : files) { @@ -228,7 +228,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { } todo.add(dt); } - retry = new Hashtable<String, DeliveryTask>(); + retry = new Hashtable<>(); } if (todoindex < todo.size()) { DeliveryTask dt = todo.get(todoindex); @@ -236,6 +236,11 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { todoindex++; continue; } + if (destinationInfo.isPrivilegedSubscriber() && dt.getResumeTime() > System.currentTimeMillis()) { + retry.put(dt.getPublishId(), dt); + todoindex++; + continue; + } if (dt.getDate() >= mindate) { return (dt); } @@ -250,7 +255,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Create a delivery queue for a given destination info */ - public DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) { + DeliveryQueue(DeliveryQueueHelper deliveryQueueHelper, DestInfo destinationInfo) { this.deliveryQueueHelper = deliveryQueueHelper; this.destinationInfo = destinationInfo; dir = new File(destinationInfo.getSpool()); @@ -288,7 +293,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Message too old to deliver */ - public void reportExpiry(DeliveryTask task) { + void reportExpiry(DeliveryTask task) { StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts()); markExpired(task); } @@ -298,8 +303,9 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { */ public void reportStatus(DeliveryTask task, int status, String xpubid, String location) { if (status < 300) { - StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid); - if (destinationInfo.isPrivilegedSubscriber()) { + StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), destinationInfo.getAuthUser(), status, xpubid); + if (destinationInfo.isPrivilegedSubscriber()) { + task.setResumeTime(System.currentTimeMillis() + deliveryQueueHelper.getWaitForFileProcessFailureTimer()); markFailWithRetry(task); } else { markSuccess(task); @@ -367,21 +373,21 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { /** * Is there no work to do for this queue right now? */ - public synchronized boolean isSkipSet() { + synchronized boolean isSkipSet() { return (peekNext() == null); } /** * Reset the retry timer */ - public void resetQueue() { + void resetQueue() { resumetime = System.currentTimeMillis(); } /** * Get task if in queue and mark as success */ - public boolean markTaskSuccess(String pubId) { + boolean markTaskSuccess(String pubId) { DeliveryTask task = working.get(pubId); if (task != null) { markSuccess(task); @@ -391,7 +397,7 @@ public class DeliveryQueue implements Runnable, DeliveryTaskHelper { if (task != null) { retry.remove(pubId); task.clean(); - resumetime = 0; + resetQueue(); failduration = 0; return true; } diff --git a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java index b64396bc..46c46675 100644 --- a/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java +++ b/datarouter-node/src/main/java/org/onap/dmaap/datarouter/node/DeliveryTask.java @@ -67,6 +67,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { private boolean followRedirects; private String[][] hdrs; private String newInvocationId; + private long resumeTime; /** @@ -77,7 +78,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { * the base for the file name in the spool directory and is of * the form <milliseconds since 1970>.<fqdn of initial data router node> */ - public DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) { + DeliveryTask(DeliveryTaskHelper deliveryTaskHelper, String pubid) { this.deliveryTaskHelper = deliveryTaskHelper; this.pubid = pubid; destInfo = deliveryTaskHelper.getDestinationInfo(); @@ -91,6 +92,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { metafile = new File(mfn); boolean monly = destInfo.isMetaDataOnly(); date = Long.parseLong(pubid.substring(0, pubid.indexOf('.'))); + resumeTime = System.currentTimeMillis(); Vector<String[]> hdrv = new Vector<>(); try (BufferedReader br = new BufferedReader(new FileReader(metafile))) { @@ -167,7 +169,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { /** * Get the publish ID */ - public String getPublishId() { + String getPublishId() { return (pubid); } @@ -334,7 +336,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { /** * Remove meta and data files */ - public void clean() { + void clean() { datafile.delete(); metafile.delete(); eelflogger.info(EelfMsgs.INVOKE, newInvocationId); @@ -343,9 +345,23 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { } /** + * Set the resume time for a delivery task. + */ + void setResumeTime(long resumeTime) { + this.resumeTime = resumeTime; + } + + /** + * Get the resume time for a delivery task. + */ + long getResumeTime() { + return resumeTime; + } + + /** * Has this delivery task been cleaned? */ - public boolean isCleaned() { + boolean isCleaned() { return (hdrs == null); } @@ -359,7 +375,7 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { /** * Get creation date as encoded in the publish ID. */ - public long getDate() { + long getDate() { return (date); } @@ -373,42 +389,42 @@ public class DeliveryTask implements Runnable, Comparable<DeliveryTask> { /** * Get the content type */ - public String getCType() { + String getCType() { return (ctype); } /** * Get the method */ - public String getMethod() { + String getMethod() { return (method); } /** * Get the file ID */ - public String getFileId() { + String getFileId() { return (fileid); } /** * Get the number of delivery attempts */ - public int getAttempts() { + int getAttempts() { return (attempts); } /** * Get the (space delimited list of) subscription ID for this delivery task */ - public String getSubId() { + String getSubId() { return (subid); } /** * Get the feed ID for this delivery task */ - public String getFeedId() { + String getFeedId() { return (feedid); } diff --git a/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/ProvDataTest.java b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/ProvDataTest.java new file mode 100644 index 00000000..562167bc --- /dev/null +++ b/datarouter-node/src/test/java/org/onap/dmaap/datarouter/node/ProvDataTest.java @@ -0,0 +1,147 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.dmaap.datarouter.node; + + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.ByteArrayInputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.nio.charset.StandardCharsets; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNull; + +@RunWith(PowerMockRunner.class) +public class ProvDataTest { + + + @Test + public void Validate_Values_Are_Set_Correctly_Through_ProvData_Constuctor() throws Exception { + String InternalProvData = + "{" + + "\"ingress\":[{" + + "\"feedid\":1," + + "\"subnet\":\"\"," + + "\"user\":\"\"," + + "\"node\":\"node\"" + + "}]," + + "\"routing\":[{" + + "\"from\":\"172.10.10.10\"," + + "\"to\":\"172.10.10.12\"," + + "\"via\":\"172.10.10.11\"" + + "}]," + + "\"subscriptions\":[{" + + "\"subid\":1," + + "\"suspend\":false," + + "\"delivery\":{" + + "\"use100\":true," + + "\"password\":\"PASSWORD\"," + + "\"user\":\"LOGIN\"," + + "\"url\":\"http://172.18.0.2:7070\"" + + "}," + + "\"last_mod\":1553608460000," + + "\"subscriber\":\"PMMAPER\"," + + "\"feedid\":1," + + "\"decompress\":false," + + "\"groupid\":1," + + "\"metadataOnly\":false," + + "\"links\":{" + + "\"feed\":\"https://dmaap-dr-prov/feed/1\"" + + ",\"log\":\"https://dmaap-dr-prov/sublog/1\"" + + ",\"self\":\"https://dmaap-dr-prov/subs/1\"" + + "}," + + "\"created_date\":1553608460000," + + "\"privilegedSubscriber\":false" + + "}]," + + "\"feeds\":[{" + + "\"suspend\":false," + + "\"groupid\":0," + + "\"description\":\"Default feed\"," + + "\"version\":\"m1.0\"," + + "\"authorization\":{" + + "\"endpoint_addrs\":[\"172.10.10.20\"]," + + "\"classification\":\"unclassified\"," + + "\"endpoint_ids\":[{" + + "\"password\":\"password\"," + + "\"id\":\"user\"" + + "}]" + + "}," + + "\"last_mod\":1553608454000," + + "\"deleted\":false," + + "\"feedid\":1," + + "\"name\":\"CSIT_Test2\"" + + ",\"business_description\":\"Default Feed\"" + + ",\"publisher\":\"dradmin\"" + + ",\"links\":{" + + "\"subscribe\":\"https://dmaap-dr-prov/subscribe/1\"," + + "\"log\":\"https://dmaap-dr-prov/feedlog/1\"," + + "\"publish\":\"https://dmaap-dr-prov/publish/1\"," + + "\"self\":\"https://dmaap-dr-prov/feed/1\"" + + "}," + + "\"created_date\":1553608454000" + + "}]," + + "\"groups\":[]," + + "\"parameters\":{" + + "\"NODES\":[\"dmaap-dr-node\"]," + + "\"PROV_DOMAIN\":\"\"" + + "}," + + "\"egress\":{" + + "\"1\":1" + + "}" + + "}" ; + Reader r = new InputStreamReader(new ByteArrayInputStream(InternalProvData.getBytes(StandardCharsets.UTF_8))); + ProvData pd = new ProvData(r); + + assertEquals(pd.getNodes().length, 1); + assertEquals(pd.getNodes()[0].getCName(), "dmaap-dr-node."); + + assertEquals(pd.getFeedUsers().length, 1); + assertEquals(pd.getFeedUsers()[0].getUser(), "user"); + assertEquals(pd.getFeedUsers()[0].getFeedId(), "1"); + assertEquals(pd.getFeeds().length, 1); + assertEquals(pd.getFeeds()[0].getId(), "1"); + assertEquals(pd.getFeedSubnets().length, 1); + assertEquals(pd.getFeedSubnets()[0].getFeedId(), "1"); + assertEquals(pd.getFeedSubnets()[0].getCidr(), "172.10.10.20"); + assertEquals(pd.getFeedSubnets()[0].getCidr(), "172.10.10.20"); + assertEquals(pd.getSubscriptions()[0].getFeedId(), "1"); + assertEquals(pd.getSubscriptions()[0].getSubId(), "1"); + assertEquals(pd.getSubscriptions()[0].getAuthUser(), "LOGIN"); + assertEquals(pd.getSubscriptions()[0].getURL(), "http://172.18.0.2:7070"); + assertEquals(pd.getForceEgress().length, 1); + assertEquals(pd.getForceEgress()[0].getNode(), "1"); + assertEquals(pd.getForceEgress()[0].getSubId(), "1"); + assertEquals(pd.getForceIngress().length, 1); + assertEquals(pd.getForceIngress()[0].getFeedId(), "1"); + assertNull(pd.getForceIngress()[0].getSubnet()); + assertNull(pd.getForceIngress()[0].getUser()); + assertEquals(pd.getHops().length, 1); + assertEquals(pd.getHops()[0].getFrom(), "172.10.10.10"); + assertEquals(pd.getHops()[0].getTo(), "172.10.10.12"); + assertEquals(pd.getHops()[0].getVia(), "172.10.10.11"); + assertEquals(pd.getParams().length, 1); + assertEquals(pd.getParams()[0].getName(), "PROV_DOMAIN"); + } +} diff --git a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoaderTest.java b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoaderTest.java index e24a9a3f..4f556005 100644 --- a/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoaderTest.java +++ b/datarouter-prov/src/test/java/org/onap/dmaap/datarouter/provisioning/utils/LogfileLoaderTest.java @@ -20,8 +20,10 @@ package org.onap.dmaap.datarouter.provisioning.utils; -import org.junit.*; -import org.junit.rules.TemporaryFolder; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.runner.RunWith; import org.onap.dmaap.datarouter.provisioning.InternalServlet; @@ -39,10 +41,10 @@ import java.io.IOException; import java.util.Map; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import org.junit.Test; - import java.util.HashMap; @@ -55,10 +57,6 @@ public class LogfileLoaderTest { private LogfileLoader lfl = LogfileLoader.getLoader(); - @Rule - public TemporaryFolder folder = new TemporaryFolder(); - - @BeforeClass public static void init() { emf = Persistence.createEntityManagerFactory("dr-unit-tests"); @@ -78,8 +76,30 @@ public class LogfileLoaderTest { } + @After + public void tearDown() { + try { + new FileWriter("unit-test-logs/testFile1.txt").close(); + }catch (IOException ioe){ + System.out.println(ioe.getMessage()); + } + } + + @Test - public void Verify_File_Processing_when_Req_Type_LOG() throws IOException { + public void Verify_Histogram_When_Request_Type_Post() { + String fileContent = "2018-08-29-10-10-10-543.|PUB|1|1|https://dmaap-dr-prov:8443/publish/1/file123/|POST|application/vnd.att-dr.feed|2|128.0.0.9|user123|200"; + lfl.process(prepFile(fileContent)); + Map<Long, Long> expect = new HashMap<>(); + expect.put(17772L,1L); + expect.put(29353L,1L); + Map<Long, Long> actual = lfl.getHistogram(); + assertEquals(actual,expect); + } + + + @Test + public void Verify_File_Processing_when_Req_Type_LOG() { String fileContent = "2018-08-29-10-10-10-543.|LOG|1|1|url/file123|method|1|1|type|1|128.0.0.9|user123|2|1|1|1|other|1"; int[] actual = lfl.process(prepFile(fileContent)); int[] expect = {0, 1}; @@ -88,7 +108,7 @@ public class LogfileLoaderTest { @Test - public void Verify_File_Processing_when_Req_Type_EXP() throws IOException { + public void Verify_File_Processing_when_Req_Type_EXP() { String fileContent = "2018-08-29-10-10-10-543.|EXP|1|1|1|'url/file123'|method|ctype|3|other|4"; int[] actual = lfl.process(prepFile(fileContent)); int[] expect = {0, 1}; @@ -97,29 +117,17 @@ public class LogfileLoaderTest { @Test - public void Verify_Records_Prune_When_Record_Count_Is_Less_Then_Threshold() throws IOException{ + public void Verify_Records_Prune_When_Record_Count_Is_Less_Then_Threshold() { String fileContent = "2018-08-29-10-10-10-543.|PUB|1|1|https://dmaap-dr-prov:8443/publish/1/file123/|POST|application/vnd.att-dr.feed|2|128.0.0.9|user123|200"; lfl.process(prepFile(fileContent)); PowerMockito.mockStatic(Parameters.class); PowerMockito.when(Parameters.getParameter(Parameters.PROV_LOG_RETENTION)).thenReturn(new Parameters(Parameters.PROV_LOG_RETENTION, "0")); - Assert.assertEquals(lfl.pruneRecords(), false); - } - - - @Test - public void Verify_Histogram_When_Request_Type_Post() throws Exception { - String fileContent = "2018-08-29-10-10-10-543.|PUB|1|1|https://dmaap-dr-prov:8443/publish/1/file123/|POST|application/vnd.att-dr.feed|2|128.0.0.9|user123|200"; - lfl.process(prepFile(fileContent)); - Map<Long, Long> expect = new HashMap<>(); - expect.put(17772L,2L); - expect.put(29353L,1L); - Map<Long, Long> actual = lfl.getHistogram(); - assertTrue(actual.equals(expect)); + assertFalse(lfl.pruneRecords()); } - private File prepFile(String content) throws IOException{ - File file1 = folder.newFile("myfile1.txt"); + private File prepFile(String content){ + File file1 = new File("unit-test-logs/testFile1.txt"); try (FileWriter fileWriter = new FileWriter(file1)) { fileWriter.write(content); }catch (IOException e){ diff --git a/docs/data-router/data-router.rst b/docs/data-router/data-router.rst index 14b5181f..66e13bf5 100755 --- a/docs/data-router/data-router.rst +++ b/docs/data-router/data-router.rst @@ -429,6 +429,9 @@ Request Parameters: | suspend | Set to true if the subscription | Body | Boolean | | N | * true |
| | is in the suspended state | | | | | * false |
+------------------------+---------------------------------+------------------+------------+--------------+-------------+--------------------------------------+
+| decompress | Set to true if the data is to | Body | Boolean | | N | * true |
+| | be decompressed for subscriber | | | | | * false |
++------------------------+---------------------------------+------------------+------------+--------------+-------------+--------------------------------------+
| group-id | | Body | Integer | | Y | |
| | | | | | | |
+------------------------+---------------------------------+------------------+------------+--------------+-------------+--------------------------------------+
@@ -543,6 +546,9 @@ Request Parameters: | suspend | Set to true if the subscription | Body | Boolean | | N | * true |
| | is in the suspended state | | | | | * false |
+------------------------+---------------------------------+------------------+------------+--------------+-------------+--------------------------------------+
+| decompress | Set to true if the data is to | Body | Boolean | | N | * true |
+| | be decompressed for subscriber | | | | | * false |
++------------------------+---------------------------------+------------------+------------+--------------+-------------+--------------------------------------+
| group-id | | Body | Integer | | Y | |
| | | | | | | |
+------------------------+---------------------------------+------------------+------------+--------------+-------------+--------------------------------------+
@@ -64,7 +64,7 @@ <skip.docker.tag>true</skip.docker.tag> <skip.docker.push>true</skip.docker.push> <commons-codec.version>1.10</commons-codec.version> - <sonar.coverage.exclusions>datarouter-prov/src/main/java/org/onap/dmaap/datarouter/reports/*</sonar.coverage.exclusions> + <sonar.exclusions>**/src/main/java/org/onap/dmaap/datarouter/reports/**</sonar.exclusions> </properties> <modules> <module>datarouter-prov</module> @@ -103,6 +103,7 @@ <exclude>**/generated-sources/**</exclude> <exclude>**/yang-gen/**</exclude> <exclude>**/pax/**</exclude> + <exclude>**/src/main/java/org/onap/dmaap/datarouter/reports/**</exclude> </excludes> </configuration> <executions> |