summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java')
-rw-r--r--src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java70
1 files changed, 34 insertions, 36 deletions
diff --git a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
index ac68a11..25dc769 100644
--- a/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
+++ b/src/main/java/org/onap/dmaap/dmf/mr/backends/kafka/KafkaConsumerCache.java
@@ -8,16 +8,16 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
-*
+*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
- *
+ *
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
- *
+ *
*******************************************************************************/
package org.onap.dmaap.dmf.mr.backends.kafka;
@@ -96,27 +96,34 @@ public class KafkaConsumerCache {
// check for expirations pretty regularly
private static final long kDefault_SweepEverySeconds = 15;
- private enum Status {
- NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
- }
-
-
+ // private final rrNvReadable fSettings;
+ private MetricsSet fMetrics;
+ private final String fBaseZkPath;
+ private final ScheduledExecutorService fSweepScheduler;
+ private String fApiId;
+
+ private final ConnectionStateListener listener;
+
+ private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
+ private PathChildrenCache curatorConsumerCache;
+
+ private volatile Status status;
@Autowired
private DMaaPErrorMessages errorMessages;
-
+
/**
* User defined exception class for kafka consumer cache
- *
+ *
* @author nilanjana.maity
*
*/
public class KafkaConsumerCacheException extends Exception {
/**
* To throw the exception
- *
+ *
* @param t
*/
KafkaConsumerCacheException(Throwable t) {
@@ -124,7 +131,7 @@ public class KafkaConsumerCache {
}
/**
- *
+ *
* @param s
*/
public KafkaConsumerCacheException(String s) {
@@ -134,6 +141,10 @@ public class KafkaConsumerCache {
private static final long serialVersionUID = 1L;
}
+ private enum Status {
+ NOT_STARTED, CONNECTED, DISCONNECTED, SUSPENDED
+ }
+
/**
* Creates a KafkaConsumerCache object. Before it is used, you must call
* startCache()
@@ -178,7 +189,7 @@ public class KafkaConsumerCache {
/**
* Start the cache service. This must be called before any get/put
* operations.
- *
+ *
* @param mode
* DMAAP or cambria
* @param curator
@@ -251,8 +262,8 @@ public class KafkaConsumerCache {
EnsurePath ensurePath = new EnsurePath(fBaseZkPath);
ensurePath.ensure(curator.getZookeeperClient());
-
-
+
+
long freq = kDefault_SweepEverySeconds;
String strkSetting_SweepEverySeconds = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
kSetting_SweepEverySeconds);
@@ -274,7 +285,7 @@ public class KafkaConsumerCache {
/**
* Getting the curator oject to start the zookeeper connection estabished
- *
+ *
* @param curator
* @return curator object
*/
@@ -335,7 +346,7 @@ public class KafkaConsumerCache {
* valid) In addition, this method waits for all other consumer caches in
* the cluster to release their ownership and delete their version of this
* consumer.
- *
+ *
* @param topic
* @param consumerGroupId
* @param clientId
@@ -541,12 +552,6 @@ public class KafkaConsumerCache {
return true;
}
- // private final rrNvReadable fSettings;
- private MetricsSet fMetrics;
- private final String fBaseZkPath;
- private final ScheduledExecutorService fSweepScheduler;
- private String fApiId;
-
public void setfMetrics(final MetricsSet metrics) {
this.fMetrics = metrics;
}
@@ -555,13 +560,6 @@ public class KafkaConsumerCache {
this.fApiId = id;
}
- private final ConnectionStateListener listener;
-
- private ConcurrentHashMap<String, Kafka011Consumer> fConsumers;
- private PathChildrenCache curatorConsumerCache;
-
- private volatile Status status;
-
private void handleReconnection() {
log.info("Reading current cache data from ZK and synchronizing local cache");
@@ -664,15 +662,15 @@ public class KafkaConsumerCache {
}
public void sweep() {
- final LinkedList<String> removals = new LinkedList<String>();
+ final LinkedList<String> removals = new LinkedList<>();
long mustTouchEveryMs = kDefault_MustTouchEveryMs;
String strkSetting_TouchEveryMs = AJSCPropertiesMap.getProperty(CambriaConstants.msgRtr_prop,
kSetting_TouchEveryMs);
- if (null != strkSetting_TouchEveryMs) {
+ if (null != strkSetting_TouchEveryMs && !strkSetting_TouchEveryMs.isEmpty()) {
mustTouchEveryMs = Long.parseLong(strkSetting_TouchEveryMs);
}
-
+
final long oldestAllowedTouchMs = System.currentTimeMillis() - mustTouchEveryMs;
for (Entry<String, Kafka011Consumer> e : fConsumers.entrySet()) {
@@ -692,7 +690,7 @@ public class KafkaConsumerCache {
/**
* Creating a thread to run the sweep method
- *
+ *
* @author nilanjana.maity
*
*/
@@ -707,7 +705,7 @@ public class KafkaConsumerCache {
/**
* This method is to drop consumer
- *
+ *
* @param topic
* @param consumerGroup
* @param clientId
@@ -725,5 +723,5 @@ public class KafkaConsumerCache {
}
private static final EELFLogger log = EELFManager.getInstance().getLogger(KafkaConsumerCache.class);
-
+
} \ No newline at end of file