diff options
Diffstat (limited to 'feature-pooling-dmaap/src/main')
7 files changed, 82 insertions, 52 deletions
diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java index 4db8fe38..b8fb6410 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/PoolingManagerImpl.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,7 @@ package org.onap.policy.drools.pooling; -import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.IOException; +import com.google.gson.JsonParseException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Map; @@ -348,7 +347,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { try { dmaapMgr.setFilter(serializer.encodeFilter(filter)); - } catch (JsonProcessingException e) { + } catch (JsonParseException e) { logger.error("failed to encode server-side filter for topic {}, {}", topic, filter, e); } catch (PoolingFeatureException e) { @@ -393,7 +392,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { String txt = serializer.encodeMsg(msg); dmaapMgr.publish(txt); - } catch (JsonProcessingException e) { + } catch (JsonParseException e) { logger.error("failed to serialize message for topic {} channel {}", topic, channel, e); } catch (PoolingFeatureException e) { @@ -682,7 +681,7 @@ public class PoolingManagerImpl implements PoolingManager, TopicListener { Method meth = current.getClass().getMethod("process", msg.getClass()); changeState((State) meth.invoke(current, msg)); - } catch (IOException e) { + } catch (JsonParseException e) { logger.warn("failed to decode message for topic {}", topic, e); } catch (NoSuchMethodException | SecurityException e) { diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java index b37c33b0..eee26ef3 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/Serializer.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,11 +20,18 @@ package org.onap.policy.drools.pooling; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonParseException; +import java.util.HashMap; import java.util.Map; +import org.onap.policy.drools.pooling.message.Forward; +import org.onap.policy.drools.pooling.message.Heartbeat; +import org.onap.policy.drools.pooling.message.Identification; +import org.onap.policy.drools.pooling.message.Leader; import org.onap.policy.drools.pooling.message.Message; +import org.onap.policy.drools.pooling.message.Offline; +import org.onap.policy.drools.pooling.message.Query; /** * Serialization helper functions. @@ -32,10 +39,36 @@ import org.onap.policy.drools.pooling.message.Message; public class Serializer { /** - * Used to encode & decode JSON messages sent & received, respectively, on - * the internal DMaaP topic. + * The message type is stored in fields of this name within the JSON. */ - private final ObjectMapper mapper = new ObjectMapper(); + private static final String TYPE_FIELD = "type"; + + /** + * Used to encode & decode JSON messages sent & received, respectively, on the + * internal DMaaP topic. + */ + private final Gson gson = new Gson(); + + /** + * Maps a message subclass to its type. + */ + private static final Map<Class<? extends Message>, String> class2type = new HashMap<>(); + + /** + * Maps a message type to the appropriate subclass. + */ + private static final Map<String, Class<? extends Message>> type2class = new HashMap<>(); + + static { + class2type.put(Forward.class, "forward"); + class2type.put(Heartbeat.class, "heartbeat"); + class2type.put(Identification.class, "identification"); + class2type.put(Leader.class, "leader"); + class2type.put(Offline.class, "offline"); + class2type.put(Query.class, "query"); + + class2type.forEach((clazz, type) -> type2class.put(type, clazz)); + } /** * Constructor. @@ -49,10 +82,10 @@ public class Serializer { * * @param filter filter to be encoded * @return the filter, serialized as a JSON string - * @throws JsonProcessingException if it cannot be serialized + * @throws JsonParseException if it cannot be de-serialized */ - public String encodeFilter(Map<String, Object> filter) throws JsonProcessingException { - return mapper.writeValueAsString(filter); + public String encodeFilter(Map<String, Object> filter) throws JsonParseException { + return gson.toJson(filter); } /** @@ -60,10 +93,19 @@ public class Serializer { * * @param msg message to be encoded * @return the message, serialized as a JSON string - * @throws JsonProcessingException if it cannot be serialized + * @throws JsonParseException if it cannot be de-serialized */ - public String encodeMsg(Message msg) throws JsonProcessingException { - return mapper.writeValueAsString(msg); + public String encodeMsg(Message msg) throws JsonParseException { + JsonElement jsonEl = gson.toJsonTree(msg); + + String type = class2type.get(msg.getClass()); + if (type == null) { + throw new JsonParseException("cannot serialize " + msg.getClass()); + } + + jsonEl.getAsJsonObject().addProperty(TYPE_FIELD, type); + + return gson.toJson(jsonEl); } /** @@ -71,9 +113,23 @@ public class Serializer { * * @param msg JSON string representing the message * @return the message - * @throws IOException if it cannot be serialized + * @throws JsonParseException if it cannot be serialized */ - public Message decodeMsg(String msg) throws IOException { - return mapper.readValue(msg, Message.class); + public Message decodeMsg(String msg) throws JsonParseException { + JsonElement jsonEl = gson.fromJson(msg, JsonElement.class); + + JsonElement typeEl = jsonEl.getAsJsonObject().get(TYPE_FIELD); + if (typeEl == null) { + throw new JsonParseException("cannot deserialize " + Message.class + + " because it does not contain a field named " + TYPE_FIELD); + + } + + Class<? extends Message> clazz = type2class.get(typeEl.getAsString()); + if (clazz == null) { + throw new JsonParseException("cannot deserialize " + typeEl); + } + + return gson.fromJson(jsonEl, clazz); } } diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java index b5b64693..6be080f7 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/BucketAssignments.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ package org.onap.policy.drools.pooling.message; -import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -33,7 +32,6 @@ import org.slf4j.LoggerFactory; */ public class BucketAssignments { - @JsonIgnore private static final Logger logger = LoggerFactory.getLogger(BucketAssignments.class); /** @@ -86,7 +84,6 @@ public class BucketAssignments { * * @return the assignment leader */ - @JsonIgnore public String getLeader() { if (hostArray == null) { return null; @@ -110,7 +107,6 @@ public class BucketAssignments { * @param host host to be checked * @return {@code true} if the host has an assignment, {@code false} otherwise */ - @JsonIgnore public boolean hasAssignment(String host) { if (hostArray == null) { return false; @@ -130,7 +126,6 @@ public class BucketAssignments { * * @return all of the hosts that have an assignment */ - @JsonIgnore public Set<String> getAllHosts() { Set<String> set = new HashSet<>(); if (hostArray == null) { @@ -152,7 +147,6 @@ public class BucketAssignments { * @param hashCode hash code of the item whose assignment is desired * @return the assigned host, or {@code null} if the item has no assigned host */ - @JsonIgnore public String getAssignedHost(int hashCode) { if (hostArray == null || hostArray.length == 0) { logger.error("no buckets have been assigned"); @@ -167,7 +161,6 @@ public class BucketAssignments { * * @return the number of buckets */ - @JsonIgnore public int size() { return (hostArray != null ? hostArray.length : 0); } @@ -178,7 +171,6 @@ public class BucketAssignments { * * @throws PoolingFeatureException if the assignments are invalid */ - @JsonIgnore public void checkValidity() throws PoolingFeatureException { if (hostArray == null || hostArray.length == 0) { throw new PoolingFeatureException("missing hosts in message bucket assignments"); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java index 38acb8c9..3c34e971 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Forward.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ package org.onap.policy.drools.pooling.message; -import com.fasterxml.jackson.annotation.JsonIgnore; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.drools.pooling.PoolingFeatureException; @@ -141,14 +140,12 @@ public class Forward extends Message { this.requestId = requestId; } - @JsonIgnore public boolean isExpired(long minCreateTimeMs) { return (createTimeMs < minCreateTimeMs); } @Override - @JsonIgnore public void checkValidity() throws PoolingFeatureException { super.checkValidity(); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java index 7464a531..80149f6e 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Leader.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ package org.onap.policy.drools.pooling.message; -import com.fasterxml.jackson.annotation.JsonIgnore; import org.onap.policy.drools.pooling.PoolingFeatureException; /** @@ -50,7 +49,6 @@ public class Leader extends MessageWithAssignments { * indeed the leader. */ @Override - @JsonIgnore public void checkValidity() throws PoolingFeatureException { super.checkValidity(); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java index 215cdaec..1de87867 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/Message.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,20 +20,11 @@ package org.onap.policy.drools.pooling.message; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonSubTypes.Type; -import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.onap.policy.drools.pooling.PoolingFeatureException; /** * Messages sent on the internal topic. */ -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") -@JsonSubTypes({@Type(value = Forward.class, name = "forward"), @Type(value = Heartbeat.class, name = "heartbeat"), - @Type(value = Identification.class, name = "identification"), - @Type(value = Leader.class, name = "leader"), @Type(value = Offline.class, name = "offline"), - @Type(value = Query.class, name = "query")}) public class Message { /** @@ -90,7 +81,6 @@ public class Message { * * @throws PoolingFeatureException if the message is invalid */ - @JsonIgnore public void checkValidity() throws PoolingFeatureException { if (source == null || source.isEmpty()) { throw new PoolingFeatureException("missing message source"); diff --git a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java index 4a0b8658..cfc39231 100644 --- a/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java +++ b/feature-pooling-dmaap/src/main/java/org/onap/policy/drools/pooling/message/MessageWithAssignments.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * ONAP * ================================================================================ - * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ package org.onap.policy.drools.pooling.message; -import com.fasterxml.jackson.annotation.JsonIgnore; import org.onap.policy.drools.pooling.PoolingFeatureException; /** @@ -65,7 +64,6 @@ public class MessageWithAssignments extends Message { * If there are any assignments, it verifies there validity. */ @Override - @JsonIgnore public void checkValidity() throws PoolingFeatureException { super.checkValidity(); |