summaryrefslogtreecommitdiffstats
path: root/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java70
-rw-r--r--src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java37
2 files changed, 42 insertions, 65 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
index ac68a11..25dc769 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -8,16 +8,16 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.backends.kafka;
@@ -96,27 +96,34 @@ public class KafkaConsumerCache {
// check for expirations pretty regularly
private static final long kDefault_SweepEverySeconds = 15;
- private enum Status {
- NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
- }
-
-
+ // private final rrNvReadable fSettings;
+ private MetricsSet fMetrics;
+ private final String fBaseZkPath;
+ private final ScheduledExecutorService fSweepScheduler;
+ private String fApiId;
+
+ private final ConnectionStateListener listener;
+
+ private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
+ private PathChildrenCache curatorConsumerCache;
+
+ private volatile Status status;
@Autowired
private DMaaPErrorMessages errorMessages;
-
+
/**
* User defined exception class for kafka consumer cache
- *
+ *
* @author nilanjana.maity
*
*/
public class KafkaConsumerCacheException extends Exception {
/**
* To throw the exception
- *
+ *
* @param t
*/
KafkaConsumerCacheException(Throwable t) {
@@ -124,7 +131,7 @@ public class KafkaConsumerCache {
}
/**
- *
+ *
* @param s
*/
public KafkaConsumerCacheException(String s) {
@@ -134,6 +141,10 @@ public class KafkaConsumerCache {
private static final long serialVersionUID = 1L;
}
+ private enum Status {
+ NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
+ }
+
/**
* Creates a KafkaConsumerCache object. Before it is used, you must call
* startCache()
@@ -178,7 +189,7 @@ public class KafkaConsumerCache {
/**
* Start the cache service. This must be called before any get/put
* operations.
- *
+ *
* @param mode
* DMAAP or cambria
* @param curator
@@ -251,8 +262,8 @@ public class KafkaConsumerCache {
EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
ensurePath.ensure(curator.getZookeeperClient());
-
-
+
+
long freq = kDefault_SweepEverySeconds;
String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
kSetting_SweepEverySeconds);
@@ -274,7 +285,7 @@ public class KafkaConsumerCache {
/**
* Getting the curator oject to start the zookeeper connection estabished
- *
+ *
* @param curator
* @return curator object
*/
@@ -335,7 +346,7 @@ public class KafkaConsumerCache {
* valid) In addition, this method waits for all other consumer caches in
* the cluster to release their ownership and delete their version of this
* consumer.
- *
+ *
* @param topic
* @param consumerGroupId
* @param clientId
@@ -541,12 +552,6 @@ public class KafkaConsumerCache {
return true;
}
- // private final rrNvReadable fSettings;
- private MetricsSet fMetrics;
- private final String fBaseZkPath;
- private final ScheduledExecutorService fSweepScheduler;
- private String fApiId;
-
public void setfMetrics(final MetricsSet metrics) {
this.fMetrics = metrics;
}
@@ -555,13 +560,6 @@ public class KafkaConsumerCache {
this.fApiId = id;
}
- private final ConnectionStateListener listener;
-
- private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
- private PathChildrenCache curatorConsumerCache;
-
- private volatile Status status;
-
private void handleReconnection() {
log.info("Reading current cache data from ZK and synchronizing local cache");
@@ -664,15 +662,15 @@ public class KafkaConsumerCache {
}
public void sweep() {
- final LinkedList<String> removals = new LinkedList<String>();
+ final LinkedList<String> removals = new LinkedList<>();
long mustTouchEveryMs = kDefault_MustTouchEveryMs;
String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
kSetting_TouchEveryMs);
- if (null != strkSetting_TouchEveryMs) {
+ if (null != strkSetting_TouchEveryMs && !strkSetting_TouchEveryMs.isEmpty()) {
mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
}
-
+
final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
@@ -692,7 +690,7 @@ public class KafkaConsumerCache {
/**
* Creating a thread to run the sweep method
- *
+ *
* @author nilanjana.maity
*
*/
@@ -707,7 +705,7 @@ public class KafkaConsumerCache {
/**
* This method is to drop consumer
- *
+ *
* @param topic
* @param consumerGroup
* @param clientId
@@ -725,5 +723,5 @@ public class KafkaConsumerCache {
}
private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
-
+
} \ No newline at end of file
diff --git a/src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java b/src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java
index fe25e58..0b687ca 100644
--- a/src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java
+++ b/src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java
@@ -8,36 +8,29 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.filemonitor;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-
import javax.annotation.PostConstruct;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
-
-
-//import com.att.ssf.filemonitor.FileChangedListener;
-//import com.att.ssf.filemonitor.FileMonitor;
-
/**
* ServicePropertyService class
* @author rajashree.khare
@@ -68,17 +61,6 @@ public class ServicePropertyService {
try {
getFileList(FILE_CHANGE_LISTENER_LOC);
- /*for (File file : fileList) {
- FileChangedListener fileChangedListener = this.fileChangedListener;
- Object filePropertiesMap = this.filePropertiesMap;
- Method m = filePropertiesMap.getClass().getMethod(
- "refresh", File.class);
- m.invoke(filePropertiesMap, file);
- FileMonitor fm = FileMonitor.getInstance();
- fm.addFileChangedListener(file, fileChangedListener,
- loadOnStartup);
-
- }*/
} catch (Exception ex) {
logger.error("Error creating property map ", ex);
}
@@ -87,10 +69,9 @@ public class ServicePropertyService {
private void getFileList(String dirName) throws IOException {
File directory = new File(dirName);
- FileInputStream fis = null;
if (fileList == null)
- fileList = new ArrayList<File>();
+ fileList = new ArrayList<>();
// get all the files that are ".json" or ".properties", from a directory
// & it's sub-directories
@@ -99,8 +80,8 @@ public class ServicePropertyService {
for (File file : fList) {
// read service property files from the configuration file
if (file.isFile() && file.getPath().endsWith(USER_CONFIG_FILE)) {
- try {
- fis = new FileInputStream(file);
+ try(FileInputStream fis = new FileInputStream(file)) {
+
Properties prop = new Properties();
prop.load(fis);
@@ -109,8 +90,6 @@ public class ServicePropertyService {
}
} catch (Exception ioe) {
logger.error("Error reading the file stream ", ioe);
- } finally {
- fis.close();
}
} else if (file.isDirectory()) {
getFileList(file.getPath());