diff options
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java')
-rw-r--r-- | src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java | 70 |
1 files changed, 34 insertions, 36 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java index ac68a11..25dc769 100644 --- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java +++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java @@ -8,16 +8,16 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 -* +* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= - * + * * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * + * *******************************************************************************/ package org.onap.dmaap.dmf.mr.backends.kafka; @@ -96,27 +96,34 @@ public class KafkaConsumerCache { // check for expirations pretty regularly private static final long kDefault_SweepEverySeconds = 15; - private enum Status { - NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED - } - - + // private final rrNvReadable fSettings; + private MetricsSet fMetrics; + private final String fBaseZkPath; + private final ScheduledExecutorService fSweepScheduler; + private String fApiId; + + private final ConnectionStateListener listener; + + private ConcurrentHashMap<String, Kafka011Consumer> fConsumers; + private PathChildrenCache curatorConsumerCache; + + private volatile Status status; @Autowired private DMaaPErrorMessages errorMessages; - + /** * User defined exception class for kafka consumer cache - * + * * @author nilanjana.maity * */ public class KafkaConsumerCacheException extends Exception { /** * To throw the exception - * + * * @param t */ KafkaConsumerCacheException(Throwable t) { @@ -124,7 +131,7 @@ public class KafkaConsumerCache { } /** - * + * * @param s */ public KafkaConsumerCacheException(String s) { @@ -134,6 +141,10 @@ public class KafkaConsumerCache { private static final long serialVersionUID = 1L; } + private enum Status { + NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED + } + /** * Creates a KafkaConsumerCache object. Before it is used, you must call * startCache() @@ -178,7 +189,7 @@ public class KafkaConsumerCache { /** * Start the cache service. This must be called before any get/put * operations. - * + * * @param mode * DMAAP or cambria * @param curator @@ -251,8 +262,8 @@ public class KafkaConsumerCache { EnsurePath ensurePath = new EnsurePath(fBaseZkPath); ensurePath.ensure(curator.getZookeeperClient()); - - + + long freq = kDefault_SweepEverySeconds; String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, kSetting_SweepEverySeconds); @@ -274,7 +285,7 @@ public class KafkaConsumerCache { /** * Getting the curator oject to start the zookeeper connection estabished - * + * * @param curator * @return curator object */ @@ -335,7 +346,7 @@ public class KafkaConsumerCache { * valid) In addition, this method waits for all other consumer caches in * the cluster to release their ownership and delete their version of this * consumer. - * + * * @param topic * @param consumerGroupId * @param clientId @@ -541,12 +552,6 @@ public class KafkaConsumerCache { return true; } - // private final rrNvReadable fSettings; - private MetricsSet fMetrics; - private final String fBaseZkPath; - private final ScheduledExecutorService fSweepScheduler; - private String fApiId; - public void setfMetrics(final MetricsSet metrics) { this.fMetrics = metrics; } @@ -555,13 +560,6 @@ public class KafkaConsumerCache { this.fApiId = id; } - private final ConnectionStateListener listener; - - private ConcurrentHashMap<String, Kafka011Consumer> fConsumers; - private PathChildrenCache curatorConsumerCache; - - private volatile Status status; - private void handleReconnection() { log.info("Reading current cache data from ZK and synchronizing local cache"); @@ -664,15 +662,15 @@ public class KafkaConsumerCache { } public void sweep() { - final LinkedList<String> removals = new LinkedList<String>(); + final LinkedList<String> removals = new LinkedList<>(); long mustTouchEveryMs = kDefault_MustTouchEveryMs; String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop, kSetting_TouchEveryMs); - if (null != strkSetting_TouchEveryMs) { + if (null != strkSetting_TouchEveryMs && !strkSetting_TouchEveryMs.isEmpty()) { mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs); } - + final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs; for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) { @@ -692,7 +690,7 @@ public class KafkaConsumerCache { /** * Creating a thread to run the sweep method - * + * * @author nilanjana.maity * */ @@ -707,7 +705,7 @@ public class KafkaConsumerCache { /** * This method is to drop consumer - * + * * @param topic * @param consumerGroup * @param clientId @@ -725,5 +723,5 @@ public class KafkaConsumerCache { } private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class); - + }
\ No newline at end of file |