summaryrefslogtreecommitdiffstats
path: root/feature-pooling-dmaap/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'feature-pooling-dmaap/src/main')
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java11
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java88
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java10
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java5
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java4
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java12
-rw-r--r--feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java4
7 files changed, 82 insertions, 52 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
index 4db8fe38..b8fb6410 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,8 +20,7 @@
package org.onap.policy.drools.pooling;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.IOException;
+import com.google.gson.JsonParseException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Map;
@@ -348,7 +347,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
try {
dmaapMgr.setFilter(serializer.encodeFilter(filter));
- } catch (JsonProcessingException e) {
+ } catch (JsonParseException e) {
logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e);
} catch (PoolingFeatureException e) {
@@ -393,7 +392,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
String txt = serializer.encodeMsg(msg);
dmaapMgr.publish(txt);
- } catch (JsonProcessingException e) {
+ } catch (JsonParseException e) {
logger.error("failed to serialize message for topic {} channel {}", topic, channel, e);
} catch (PoolingFeatureException e) {
@@ -682,7 +681,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener {
Method meth = current.getClass().getMethod("process", msg.getClass());
changeState((State) meth.invoke(current, msg));
- } catch (IOException e) {
+ } catch (JsonParseException e) {
logger.warn("failed to decode message for topic {}", topic, e);
} catch (NoSuchMethodException | SecurityException e) {
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
index b37c33b0..eee26ef3 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,11 +20,18 @@
package org.onap.policy.drools.pooling;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParseException;
+import java.util.HashMap;
import java.util.Map;
+import org.onap.policy.drools.pooling.message.Forward;
+import org.onap.policy.drools.pooling.message.Heartbeat;
+import org.onap.policy.drools.pooling.message.Identification;
+import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Message;
+import org.onap.policy.drools.pooling.message.Offline;
+import org.onap.policy.drools.pooling.message.Query;
/**
* Serialization helper functions.
@@ -32,10 +39,36 @@ import org.onap.policy.drools.pooling.message.Message;
public class Serializer {
/**
- * Used to encode & decode JSON messages sent & received, respectively, on
- * the internal DMaaP topic.
+ * The message type is stored in fields of this name within the JSON.
*/
- private final ObjectMapper mapper = new ObjectMapper();
+ private static final String TYPE_FIELD = "type";
+
+ /**
+ * Used to encode & decode JSON messages sent & received, respectively, on the
+ * internal DMaaP topic.
+ */
+ private final Gson gson = new Gson();
+
+ /**
+ * Maps a message subclass to its type.
+ */
+ private static final Map<Class<? extends Message>, String> class2type = new HashMap<>();
+
+ /**
+ * Maps a message type to the appropriate subclass.
+ */
+ private static final Map<String, Class<? extends Message>> type2class = new HashMap<>();
+
+ static {
+ class2type.put(Forward.class, "forward");
+ class2type.put(Heartbeat.class, "heartbeat");
+ class2type.put(Identification.class, "identification");
+ class2type.put(Leader.class, "leader");
+ class2type.put(Offline.class, "offline");
+ class2type.put(Query.class, "query");
+
+ class2type.forEach((clazz, type) -> type2class.put(type, clazz));
+ }
/**
* Constructor.
@@ -49,10 +82,10 @@ public class Serializer {
*
* @param filter filter to be encoded
* @return the filter, serialized as a JSON string
- * @throws JsonProcessingException if it cannot be serialized
+ * @throws JsonParseException if it cannot be de-serialized
*/
- public String encodeFilter(Map<String, Object> filter) throws JsonProcessingException {
- return mapper.writeValueAsString(filter);
+ public String encodeFilter(Map<String, Object> filter) throws JsonParseException {
+ return gson.toJson(filter);
}
/**
@@ -60,10 +93,19 @@ public class Serializer {
*
* @param msg message to be encoded
* @return the message, serialized as a JSON string
- * @throws JsonProcessingException if it cannot be serialized
+ * @throws JsonParseException if it cannot be de-serialized
*/
- public String encodeMsg(Message msg) throws JsonProcessingException {
- return mapper.writeValueAsString(msg);
+ public String encodeMsg(Message msg) throws JsonParseException {
+ JsonElement jsonEl = gson.toJsonTree(msg);
+
+ String type = class2type.get(msg.getClass());
+ if (type == null) {
+ throw new JsonParseException("cannot serialize " + msg.getClass());
+ }
+
+ jsonEl.getAsJsonObject().addProperty(TYPE_FIELD, type);
+
+ return gson.toJson(jsonEl);
}
/**
@@ -71,9 +113,23 @@ public class Serializer {
*
* @param msg JSON string representing the message
* @return the message
- * @throws IOException if it cannot be serialized
+ * @throws JsonParseException if it cannot be serialized
*/
- public Message decodeMsg(String msg) throws IOException {
- return mapper.readValue(msg, Message.class);
+ public Message decodeMsg(String msg) throws JsonParseException {
+ JsonElement jsonEl = gson.fromJson(msg, JsonElement.class);
+
+ JsonElement typeEl = jsonEl.getAsJsonObject().get(TYPE_FIELD);
+ if (typeEl == null) {
+ throw new JsonParseException("cannot deserialize " + Message.class
+ + " because it does not contain a field named " + TYPE_FIELD);
+
+ }
+
+ Class<? extends Message> clazz = type2class.get(typeEl.getAsString());
+ if (clazz == null) {
+ throw new JsonParseException("cannot deserialize " + typeEl);
+ }
+
+ return gson.fromJson(jsonEl, clazz);
}
}
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
index b5b64693..6be080f7 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@
package org.onap.policy.drools.pooling.message;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory;
*/
public class BucketAssignments {
- @JsonIgnore
private static final Logger logger = LoggerFactory.getLogger(BucketAssignments.class);
/**
@@ -86,7 +84,6 @@ public class BucketAssignments {
*
* @return the assignment leader
*/
- @JsonIgnore
public String getLeader() {
if (hostArray == null) {
return null;
@@ -110,7 +107,6 @@ public class BucketAssignments {
* @param host host to be checked
* @return {@code true} if the host has an assignment, {@code false} otherwise
*/
- @JsonIgnore
public boolean hasAssignment(String host) {
if (hostArray == null) {
return false;
@@ -130,7 +126,6 @@ public class BucketAssignments {
*
* @return all of the hosts that have an assignment
*/
- @JsonIgnore
public Set<String> getAllHosts() {
Set<String> set = new HashSet<>();
if (hostArray == null) {
@@ -152,7 +147,6 @@ public class BucketAssignments {
* @param hashCode hash code of the item whose assignment is desired
* @return the assigned host, or {@code null} if the item has no assigned host
*/
- @JsonIgnore
public String getAssignedHost(int hashCode) {
if (hostArray == null || hostArray.length == 0) {
logger.error("no buckets have been assigned");
@@ -167,7 +161,6 @@ public class BucketAssignments {
*
* @return the number of buckets
*/
- @JsonIgnore
public int size() {
return (hostArray != null ? hostArray.length : 0);
}
@@ -178,7 +171,6 @@ public class BucketAssignments {
*
* @throws PoolingFeatureException if the assignments are invalid
*/
- @JsonIgnore
public void checkValidity() throws PoolingFeatureException {
if (hostArray == null || hostArray.length == 0) {
throw new PoolingFeatureException("missing hosts in message bucket assignments");
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
index 38acb8c9..3c34e971 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@
package org.onap.policy.drools.pooling.message;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
import org.onap.policy.drools.pooling.PoolingFeatureException;
@@ -141,14 +140,12 @@ public class Forward extends Message {
this.requestId = requestId;
}
- @JsonIgnore
public boolean isExpired(long minCreateTimeMs) {
return (createTimeMs < minCreateTimeMs);
}
@Override
- @JsonIgnore
public void checkValidity() throws PoolingFeatureException {
super.checkValidity();
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java
index 7464a531..80149f6e 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@
package org.onap.policy.drools.pooling.message;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import org.onap.policy.drools.pooling.PoolingFeatureException;
/**
@@ -50,7 +49,6 @@ public class Leader extends MessageWithAssignments {
* indeed the leader.
*/
@Override
- @JsonIgnore
public void checkValidity() throws PoolingFeatureException {
super.checkValidity();
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java
index 215cdaec..1de87867 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,20 +20,11 @@
package org.onap.policy.drools.pooling.message;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.onap.policy.drools.pooling.PoolingFeatureException;
/**
* Messages sent on the internal topic.
*/
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
-@JsonSubTypes({@Type(value = Forward.class, name = "forward"), @Type(value = Heartbeat.class, name = "heartbeat"),
- @Type(value = Identification.class, name = "identification"),
- @Type(value = Leader.class, name = "leader"), @Type(value = Offline.class, name = "offline"),
- @Type(value = Query.class, name = "query")})
public class Message {
/**
@@ -90,7 +81,6 @@ public class Message {
*
* @throws PoolingFeatureException if the message is invalid
*/
- @JsonIgnore
public void checkValidity() throws PoolingFeatureException {
if (source == null || source.isEmpty()) {
throw new PoolingFeatureException("missing message source");
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java
index 4a0b8658..cfc39231 100644
--- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java
+++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* ONAP
* ================================================================================
- * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@
package org.onap.policy.drools.pooling.message;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import org.onap.policy.drools.pooling.PoolingFeatureException;
/**
@@ -65,7 +64,6 @@ public class MessageWithAssignments extends Message {
* If there are any assignments, it verifies there validity.
*/
@Override
- @JsonIgnore
public void checkValidity() throws PoolingFeatureException {
super.checkValidity();