diff options
Diffstat (limited to 'src/main/java/org')
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java | 70 | ||||
-rw-r--r-- | src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java | 37 |
2 files changed, 42 insertions, 65 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java index ac68a11..25dc769 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java @@ -8,16 +8,16 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.dmf.mr.backends.kafka; @@ -96,27 +96,34 @@ public class KafkaConsumerCache { // check for expirations pretty regularly private static final long kDefault_SweepEverySeconds = 15; - private enum Status { - NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED - } - - + // private final rrNvReadable fSettings; + private MetricsSet fMetrics; + private final String fBaseZkPath; + private final ScheduledExecutorService fSweepScheduler; + private String fApiId; + + private final ConnectionStateListener listener; + + private ConcurrentHashMap<String, Kafka011Consumer> fConsumers; + private PathChildrenCache curatorConsumerCache; + + private volatile Status status; @Autowired private DMaaPErrorMessages errorMessages; - + /** * User defined exception class for kafka consumer cache - * + * * @author nilanjana.maity * */ public class KafkaConsumerCacheException extends Exception { /** * To throw the exception - * + * * @param t */ KafkaConsumerCacheException(Throwable t) { @@ -124,7 +131,7 @@ public class KafkaConsumerCache { } /** - * + * * @param s */ public KafkaConsumerCacheException(String s) { @@ -134,6 +141,10 @@ public class KafkaConsumerCache { private static final long serialVersionUID = 1L; } + private enum Status { + NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED + } + /** * Creates a KafkaConsumerCache object. Before it is used, you must call * startCache() @@ -178,7 +189,7 @@ public class KafkaConsumerCache { /** * Start the cache service. This must be called before any get/put * operations. - * + * * @param mode * DMAAP or cambria * @param curator @@ -251,8 +262,8 @@ public class KafkaConsumerCache { EnsurePath ensurePath = new EnsurePath(fBaseZkPath); ensurePath.ensure(curator.getZookeeperClient()); - - + + long freq = kDefault_SweepEverySeconds; String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, kSetting_SweepEverySeconds); @@ -274,7 +285,7 @@ public class KafkaConsumerCache { /** * Getting the curator oject to start the zookeeper connection estabished - * + * * @param curator * @return curator object */ @@ -335,7 +346,7 @@ public class KafkaConsumerCache { * valid) In addition, this method waits for all other consumer caches in * the cluster to release their ownership and delete their version of this * consumer. - * + * * @param topic * @param consumerGroupId * @param clientId @@ -541,12 +552,6 @@ public class KafkaConsumerCache { return true; } - // private final rrNvReadable fSettings; - private MetricsSet fMetrics; - private final String fBaseZkPath; - private final ScheduledExecutorService fSweepScheduler; - private String fApiId; - public void setfMetrics(final MetricsSet metrics) { this.fMetrics = metrics; } @@ -555,13 +560,6 @@ public class KafkaConsumerCache { this.fApiId = id; } - private final ConnectionStateListener listener; - - private ConcurrentHashMap<String, Kafka011Consumer> fConsumers; - private PathChildrenCache curatorConsumerCache; - - private volatile Status status; - private void handleReconnection() { log.info("Reading current cache data from ZK and synchronizing local cache"); @@ -664,15 +662,15 @@ public class KafkaConsumerCache { } public void sweep() { - final LinkedList<String> removals = new LinkedList<String>(); + final LinkedList<String> removals = new LinkedList<>(); long mustTouchEveryMs = kDefault_MustTouchEveryMs; String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, kSetting_TouchEveryMs); - if (null != strkSetting_TouchEveryMs) { + if (null != strkSetting_TouchEveryMs && !strkSetting_TouchEveryMs.isEmpty()) { mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs); } - + final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs; for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) { @@ -692,7 +690,7 @@ public class KafkaConsumerCache { /** * Creating a thread to run the sweep method - * + * * @author nilanjana.maity * */ @@ -707,7 +705,7 @@ public class KafkaConsumerCache { /** * This method is to drop consumer - * + * * @param topic * @param consumerGroup * @param clientId @@ -725,5 +723,5 @@ public class KafkaConsumerCache { } private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class); - + }
\ No newline at end of file diff --git a/src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java b/src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java index fe25e58..0b687ca 100644 --- a/src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java +++ b/src/main/java/org/onap/dmaap/filemonitor/ServicePropertyService.java @@ -8,36 +8,29 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.filemonitor; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.Properties; - import javax.annotation.PostConstruct; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; - - -//import com.att.ssf.filemonitor.FileChangedListener; -//import com.att.ssf.filemonitor.FileMonitor; - /** * ServicePropertyService class * @author rajashree.khare @@ -68,17 +61,6 @@ public class ServicePropertyService { try { getFileList(FILE_CHANGE_LISTENER_LOC); - /*for (File file : fileList) { - FileChangedListener fileChangedListener = this.fileChangedListener; - Object filePropertiesMap = this.filePropertiesMap; - Method m = filePropertiesMap.getClass().getMethod( - "refresh", File.class); - m.invoke(filePropertiesMap, file); - FileMonitor fm = FileMonitor.getInstance(); - fm.addFileChangedListener(file, fileChangedListener, - loadOnStartup); - - }*/ } catch (Exception ex) { logger.error("Error creating property map ", ex); } @@ -87,10 +69,9 @@ public class ServicePropertyService { private void getFileList(String dirName) throws IOException { File directory = new File(dirName); - FileInputStream fis = null; if (fileList == null) - fileList = new ArrayList<File>(); + fileList = new ArrayList<>(); // get all the files that are ".json" or ".properties", from a directory // & it's sub-directories @@ -99,8 +80,8 @@ public class ServicePropertyService { for (File file : fList) { // read service property files from the configuration file if (file.isFile() && file.getPath().endsWith(USER_CONFIG_FILE)) { - try { - fis = new FileInputStream(file); + try(FileInputStream fis = new FileInputStream(file)) { + Properties prop = new Properties(); prop.load(fis); @@ -109,8 +90,6 @@ public class ServicePropertyService { } } catch (Exception ioe) { logger.error("Error reading the file stream ", ioe); - } finally { - fis.close(); } } else if (file.isDirectory()) { getFileList(file.getPath()); |