From 65c5d8b92e759ec4ee9d13f048b069033b4bf097 Mon Sep 17 00:00:00 2001 From: Prudence Au Date: Mon, 16 Jul 2018 22:02:04 -0400 Subject: POMBA Context Aggregator Change-Id: I33acc1f46d35447b63e2d5438c9ce6728a2b03eb Issue-ID: LOG-519 Signed-off-by: Prudence Au POMBA Context Aggregator with JUnit tests Change-Id: I33acc1f46d35447b63e2d5438c9ce6728a2b03eb Issue-ID: LOG-519 Signed-off-by: Prudence Au --- .../onap/pomba/contextaggregator/Application.java | 33 +++ .../contextaggregator/builder/ContextBuilder.java | 103 +++++++++ .../config/BuilderConfigLoader.java | 105 +++++++++ .../contextaggregator/config/EventConfig.java | 37 ++++ .../config/EventHeaderConfig.java | 62 ++++++ .../contextaggregator/config/TransportConfig.java | 66 ++++++ .../datatypes/AggregatedModels.java | 170 +++++++++++++++ .../contextaggregator/datatypes/POAEvent.java | 143 +++++++++++++ .../exception/ContextAggregatorError.java | 51 +++++ .../exception/ContextAggregatorException.java | 50 +++++ .../publisher/EventPublisherFactory.java | 77 +++++++ .../pomba/contextaggregator/rest/RestRequest.java | 114 ++++++++++ .../service/ContextAggregatorProcessor.java | 238 +++++++++++++++++++++ .../service/ContextAggregatorService.java | 44 ++++ 14 files changed, 1293 insertions(+) create mode 100644 src/main/java/org/onap/pomba/contextaggregator/Application.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/builder/ContextBuilder.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/config/BuilderConfigLoader.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/config/EventConfig.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/config/EventHeaderConfig.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/datatypes/AggregatedModels.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/datatypes/POAEvent.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorException.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/publisher/EventPublisherFactory.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java create mode 100644 src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorService.java (limited to 'src/main/java') diff --git a/src/main/java/org/onap/pomba/contextaggregator/Application.java b/src/main/java/org/onap/pomba/contextaggregator/Application.java new file mode 100644 index 0000000..ad3a49d --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/Application.java @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + + System.setProperty("server.ssl.enabled", "false"); + + SpringApplication.run(Application.class, args); + } + +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/builder/ContextBuilder.java b/src/main/java/org/onap/pomba/contextaggregator/builder/ContextBuilder.java new file mode 100644 index 0000000..c1ef808 --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/builder/ContextBuilder.java @@ -0,0 +1,103 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.builder; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +public class ContextBuilder { + + private static final String HOST = "server.host"; + private static final String PORT = "server.port"; + private static final String PROTOCOL = "server.protocol"; + private static final String SECURITY_PROTOCOL = "security.protocol"; + private static final String CONNECTION_TIMEOUT = "connection.timeout.ms"; + private static final String READ_TIMEOUT = "read.timeout.ms"; + private static final String BASE_URI = "base.uri"; + private static final String USERNAME = "basicauth.username"; + private static final String PASSWORD = "basicauth.password"; + + private Properties properties; + private String contextName; + + + /** + * Instantiates a context builder by loading the given properties file. The context name is + * extracted from the file name. File name format is expected to be [context-name].properties (ex: + * aai.properties) + * + * @param configFile + * @throws IOException + */ + public ContextBuilder(File configFile) throws IOException { + properties = new Properties(); + InputStream input = new FileInputStream(configFile.getAbsolutePath()); + properties.load(input); + String fileName = configFile.getName(); + contextName = fileName.substring(0, fileName.lastIndexOf('.')); + } + + public ContextBuilder(InputStream is, String resName) throws IOException { + properties = new Properties(); + properties.load(is); + contextName = resName.substring(0, resName.lastIndexOf('.')); + } + + public String getContextName() { + return contextName; + } + + public String getHost() { + return properties.getProperty(HOST); + } + + public int getPort() { + return Integer.parseInt(properties.getProperty(PORT)); + } + + public String getProtocol() { + return properties.getProperty(PROTOCOL); + } + + public String getSecurityProtocol() { + return properties.getProperty(SECURITY_PROTOCOL); + } + + public int getConnectionTimeout() { + return Integer.parseInt(properties.getProperty(CONNECTION_TIMEOUT)); + } + + public int getReadTimeout() { + return Integer.parseInt(properties.getProperty(READ_TIMEOUT)); + } + + public String getBaseUri() { + return properties.getProperty(BASE_URI); + } + + public String getUsername() { + return properties.getProperty(USERNAME); + } + + public String getPassword() { + return properties.getProperty(PASSWORD); + } +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/BuilderConfigLoader.java b/src/main/java/org/onap/pomba/contextaggregator/config/BuilderConfigLoader.java new file mode 100644 index 0000000..da9fe0f --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/config/BuilderConfigLoader.java @@ -0,0 +1,105 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.config; + +import static java.lang.String.format; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.onap.pomba.contextaggregator.builder.ContextBuilder; +import org.onap.pomba.contextaggregator.exception.ContextAggregatorError; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.ResourcePatternResolver; + +@Configuration +public class BuilderConfigLoader { + private static Logger log = LoggerFactory.getLogger(BuilderConfigLoader.class); + private static final Resource[] EMPTY_ARRAY = new Resource[0]; + private static final String[] BUILDERS_PROPERTIES_LOCATION = + {"file:./%s/*.properties", "classpath:/%s/*.properties"}; + + @Autowired + private ResourcePatternResolver rpr; + @Value("${builders.properties.path}") + private String buildersPropertiesPath; + + + /** + * Generates a list of context builders by loading property files (*.properties) from a configured + * builders location. + * + *
+     * The location is searched in the following order: file:./${buildersPropertiesPath} and
+     * classpath:/${buildersPropertiesPath} to support override of default values.
+     *
+     * @return Returns a list of context builders
+     */
+    @Bean
+    public List contextBuilders() {
+        try {
+            final Resource[] blrdsConfig = resolveBldrsConfig();
+            if (isEmpty(blrdsConfig)) {
+                log.error(ContextAggregatorError.BUILDER_PROPERTIES_NOT_FOUND
+                        .getMessage(Arrays.toString(bldrsPropLoc2Path(buildersPropertiesPath))));
+                return Collections.emptyList();
+            }
+
+            final List contextBuilders = new ArrayList<>();
+            for (Resource r : blrdsConfig) {
+                contextBuilders.add(new ContextBuilder(r.getInputStream(), r.getFilename()));
+            }
+
+            return contextBuilders;
+        } catch (IOException ex) {
+            log.error(ContextAggregatorError.FAILED_TO_LOAD_BUILDER_PROPERTIES.getMessage(ex.getMessage()));
+        }
+
+        return Collections.emptyList();
+    }
+
+    private Resource[] resolveBldrsConfig() throws IOException {
+        for (String p : bldrsPropLoc2Path(buildersPropertiesPath)) {
+            Resource[] bldrsConfig = rpr.getResources(p);
+            if (!isEmpty(bldrsConfig)) {
+                return bldrsConfig;
+            }
+        }
+        return EMPTY_ARRAY;
+    }
+
+    private static String[] bldrsPropLoc2Path(String buildersPropertiesPath) {
+        String[] res = new String[BUILDERS_PROPERTIES_LOCATION.length];
+        int indx = 0;
+        for (String tmpl : BUILDERS_PROPERTIES_LOCATION) {
+            res[indx++] = format(tmpl, buildersPropertiesPath).replace("//", "/");
+        }
+        return res;
+    }
+
+    private static boolean isEmpty(Object[] obj) {
+        return obj == null || obj.length == 0;
+    }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/EventConfig.java b/src/main/java/org/onap/pomba/contextaggregator/config/EventConfig.java
new file mode 100644
index 0000000..f70d0c2
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/config/EventConfig.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class EventConfig {
+
+    @Bean
+    public EventHeaderConfig eventHeaderConfig(@Value("${event.header.domain}") String domain,
+            @Value("${event.header.source-name}") String sourceName,
+            @Value("${event.header.event-type}") String eventType,
+            @Value("${event.header.entity-type}") String entityType,
+            @Value("${event.header.topic-entity-type}") String topicEntityType,
+            @Value("${event.header.topic-name}") String topicName) {
+
+        return new EventHeaderConfig(domain, sourceName, eventType, entityType, topicEntityType, topicName);
+    }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/EventHeaderConfig.java b/src/main/java/org/onap/pomba/contextaggregator/config/EventHeaderConfig.java
new file mode 100644
index 0000000..a1cd4f1
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/config/EventHeaderConfig.java
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.config;
+
+public class EventHeaderConfig {
+
+    private String domain;
+    private String sourceName;
+    private String eventType;
+    private String entityType;
+    private String topicEntityType;
+    private String topicName;
+
+    public EventHeaderConfig(String domain, String sourceName, String eventType, String entityType,
+            String topicEntityType, String topicName) {
+        this.domain = domain;
+        this.sourceName = sourceName;
+        this.eventType = eventType;
+        this.entityType = entityType;
+        this.topicEntityType = topicEntityType;
+        this.topicName = topicName;
+    }
+
+    public String getDomain() {
+        return domain;
+    }
+
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public String getEventType() {
+        return eventType;
+    }
+
+    public String getEntityType() {
+        return entityType;
+    }
+
+    public String getTopicEntityType() {
+        return topicEntityType;
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java b/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java
new file mode 100644
index 0000000..12f0cc8
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.config;
+
+import java.util.Properties;
+import org.onap.pomba.contextaggregator.publisher.EventPublisherFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+
+@Configuration
+public class TransportConfig {
+
+    @Bean
+    public MRConsumer consumer(@Value("${transport.consume.host}") String host,
+            @Value("${transport.consume.port}") String port, @Value("${transport.consume.topic}") String topic,
+            @Value("${transport.consume.motsid}") String motsid, @Value("${transport.consume.pass}") String pass,
+            @Value("${transport.consume.consumergroup}") String consumerGroup,
+            @Value("${transport.consume.consumerid}") String consumerId,
+            @Value("${transport.consume.timeout}") int timeout, @Value("${transport.consume.batchsize}") int batchSize,
+            @Value("${transport.consume.msglimit}") int msgLimit, @Value("${transport.consume.type}") String type) {
+
+        String hostStr = host + ":" + port;
+
+        final MRConsumer consumer = MRClientFactory.createConsumer(hostStr, topic, motsid, pass, consumerGroup,
+                consumerId, timeout, msgLimit, type, null);
+
+        final Properties extraProps = new Properties();
+        extraProps.put("Protocol", "http");
+        ((MRConsumerImpl) consumer).setProps(extraProps);
+
+        return consumer;
+    }
+
+    @Bean
+    public EventPublisherFactory publisherFactory(@Value("${transport.publish.host}") String host,
+            @Value("${transport.publish.port}") String port, @Value("${transport.publish.topic}") String topic,
+            @Value("${transport.publish.motsid}") String motsid, @Value("${transport.publish.pass}") String pass,
+            @Value("${transport.publish.batchsize}") int batchSize, @Value("${transport.publish.maxage}") int maxAge,
+            @Value("${transport.publish.delay}") int delay, @Value("${transport.publish.type}") String type,
+            @Value("${transport.publish.partition}") String partition,
+            @Value("${transport.publish.retries}") int retries) {
+
+        String hostStr = host + ":" + port;
+        return new EventPublisherFactory(hostStr, topic, motsid, pass, batchSize, maxAge, delay, type, partition,
+                retries);
+    }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/datatypes/AggregatedModels.java b/src/main/java/org/onap/pomba/contextaggregator/datatypes/AggregatedModels.java
new file mode 100644
index 0000000..5648b91
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/datatypes/AggregatedModels.java
@@ -0,0 +1,170 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.datatypes;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import org.onap.pomba.common.datatypes.ModelContext;
+import org.onap.pomba.contextaggregator.config.EventHeaderConfig;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class AggregatedModels {
+
+    @Expose
+    @SerializedName("event-header")
+    Header entityHeader;
+    @Expose
+    @SerializedName("entity")
+    POAEntity poaEntity;
+
+
+    /**
+     * Creates an event with an entity header and entity containing the models and poa-event from Dmaap
+     *
+     * @param headerConfig
+     * @param jsonContextMap
+     */
+    public AggregatedModels(EventHeaderConfig headerConfig, Map jsonContextMap, POAEvent event) {
+        entityHeader = new Header(headerConfig);
+
+        Gson gson = new GsonBuilder().create();
+        Map contextMap = new HashMap<>();
+        for (Entry entry : jsonContextMap.entrySet()) {
+            ModelContext context = null;
+            if (entry.getValue().isEmpty()) {
+                context = new ModelContext();
+                context.setVf(null);
+            } else {
+                context = gson.fromJson(entry.getValue(), ModelContext.class);
+            }
+            contextMap.put(entry.getKey(), context);
+        }
+
+        poaEntity = new POAEntity(contextMap, event);
+    }
+
+
+    /**
+     * Returns this instance as a JSON payload
+     *
+     * @return
+     */
+    public String generateJsonPayload() {
+        Gson gson = new GsonBuilder().create();
+        String payload = gson.toJson(this);
+        return payload;
+    }
+
+    public Header getEntityHeader() {
+        return entityHeader;
+    }
+
+
+
+    /**
+     * Entity header class for JSON serialization
+     */
+    private class Header {
+        @Expose
+        private String id;
+        @Expose
+        private String domain;
+        @Expose
+        @SerializedName("source-name")
+        private String sourceName;
+        @Expose
+        @SerializedName("event-type")
+        private String eventType;
+        @Expose
+        @SerializedName("entity-type")
+        private String entityType;
+        @Expose
+        @SerializedName("top-entity-type")
+        private String topEntityType;
+        @Expose
+        @SerializedName("topic-name")
+        private String topicName;
+        @Expose
+        @SerializedName("event-id")
+        private String eventId;
+
+        public Header(EventHeaderConfig config) {
+            id = UUID.randomUUID().toString();
+            domain = config.getDomain();
+            sourceName = config.getSourceName();
+            eventType = config.getEventType();
+            entityType = config.getEntityType();
+            topEntityType = config.getTopicEntityType();
+            topicName = config.getTopicName();
+            eventId = UUID.randomUUID().toString();
+        }
+
+
+        public String getId() {
+            return id;
+        }
+
+        public String getDomain() {
+            return domain;
+        }
+
+        public String getSourceName() {
+            return sourceName;
+        }
+
+        public String getEventType() {
+            return eventType;
+        }
+
+        public String getEntityType() {
+            return entityType;
+        }
+
+        public String getTopEntityType() {
+            return topEntityType;
+        }
+
+        public String getTopicName() {
+            return topicName;
+        }
+
+        public String getEventId() {
+            return eventId;
+        }
+    }
+
+
+    private class POAEntity {
+        @Expose
+        @SerializedName("poa-event")
+        POAEvent event;
+        @Expose
+        @SerializedName("context-list")
+        private Map contextMap;
+
+        public POAEntity(Map contextMap, POAEvent event) {
+            this.contextMap = contextMap;
+            this.event = event;
+        }
+    }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/datatypes/POAEvent.java b/src/main/java/org/onap/pomba/contextaggregator/datatypes/POAEvent.java
new file mode 100644
index 0000000..d54ece4
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/datatypes/POAEvent.java
@@ -0,0 +1,143 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.datatypes;
+
+import org.onap.pomba.contextaggregator.exception.ContextAggregatorError;
+import org.onap.pomba.contextaggregator.exception.ContextAggregatorException;
+
+public class POAEvent {
+
+    private String serviceInstanceId;
+    private String modelVersionId;
+    private String modelInvariantId;
+    private String customerId;
+    private String serviceType;
+    private String xFromAppId;
+    private String xTransactionId;
+
+    public POAEvent() {}
+
+    public String getServiceInstanceId() {
+        return serviceInstanceId;
+    }
+
+    public void setServiceInstanceId(String serviceInstanceId) {
+        this.serviceInstanceId = serviceInstanceId;
+    }
+
+    public String getModelVersionId() {
+        return modelVersionId;
+    }
+
+    public void setModelVersionId(String modelVersionId) {
+        this.modelVersionId = modelVersionId;
+    }
+
+    public String getModelInvariantId() {
+        return modelInvariantId;
+    }
+
+    public void setModelInvariantId(String modelInvariantId) {
+        this.modelInvariantId = modelInvariantId;
+    }
+
+    public String getCustomerId() {
+        return customerId;
+    }
+
+    public void setCustomerId(String customerId) {
+        this.customerId = customerId;
+    }
+
+    public String getServiceType() {
+        return serviceType;
+    }
+
+    public void setServiceType(String serviceType) {
+        this.serviceType = serviceType;
+    }
+
+    public String getxFromAppId() {
+        return xFromAppId;
+    }
+
+    public void setxFromAppId(String xFromAppId) {
+        this.xFromAppId = xFromAppId;
+    }
+
+    public String getxTransactionId() {
+        return xTransactionId;
+    }
+
+    public void setxTransactionId(String xTransactionId) {
+        this.xTransactionId = xTransactionId;
+    }
+
+    public boolean validate() throws ContextAggregatorException {
+        final String missing = " is missing";
+
+        // serviceInstanceId
+        if (getServiceInstanceId() == null || getServiceInstanceId().isEmpty()) {
+            throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED,
+                    "serviceInstanceId" + missing);
+        }
+
+        // modelVersionId
+        if (getModelVersionId() == null || getModelVersionId().isEmpty()) {
+            throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED,
+                    "modelVersionId" + missing);
+        }
+
+        // modelInvariantId
+        if (getModelInvariantId() == null || getModelInvariantId().isEmpty()) {
+            throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED,
+                    "modelInvariantId" + missing);
+        }
+
+        // customerId
+        if (getCustomerId() == null || getCustomerId().isEmpty()) {
+            throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED, "customerId" + missing);
+        }
+
+        // serviceType
+        if (getServiceType() == null || getServiceType().isEmpty()) {
+            throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED,
+                    "serviceType" + missing);
+        }
+
+        // X-FromAppId
+        if (getxFromAppId() == null || getxFromAppId().isEmpty()) {
+            throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED, "xFromAppId" + missing);
+        }
+
+        // X-TransactionId
+        if (getxTransactionId() == null || getxTransactionId().isEmpty()) {
+            throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED,
+                    "xTransactionId" + missing);
+        }
+
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "POAEvent [serviceInstanceId=" + serviceInstanceId + ", modelVersionId=" + modelVersionId
+                + ", modelInvariantId=" + modelInvariantId + ", customerId=" + customerId + ", serviceType="
+                + serviceType + ", xFromAppId=" + xFromAppId + ", xTransactionId=" + xTransactionId + "]";
+    }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java
new file mode 100644
index 0000000..cee1fda
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.exception;
+
+import java.text.MessageFormat;
+
+public enum ContextAggregatorError {
+
+    GENERAL_ERROR("CA-100", "Internal error encountered: {0}"),
+    FAILED_TO_LOAD_BUILDER_PROPERTIES("CA-101", "Failed to load builder properties: {0}"),
+    JSON_PARSER_ERROR("CA-102", "Failed to parse JSON request: {0}"),
+    INVALID_EVENT_RECEIVED("CA-103", "Invalid event received: {0}"),
+    EVENT_ATTRIBUTE_MISSING("CA-104", "Mandatory attribute missing from event: {0}"),
+    FAILED_TO_GET_MODEL_DATA("CA-105", "Failed to retrieve model data for {0}, reason: {1}"),
+    PUBLISHER_SEND_ERROR("CA-106", "Error encountered when publishing messages: {0}"),
+    PUBLISHER_CLOSE_ERROR("CA-107", "Error encountered when closing publisher: {0}"),
+    FAILED_TO_PUBLISH_RESULT("CA-108", "Failed to publish model data: {0}"),
+    BUILDER_PROPERTIES_NOT_FOUND("CA-109", "No builder properties were found under location(s): {0}");
+
+    private String errorId;
+    private String message;
+
+    private ContextAggregatorError(String errorId, String message) {
+        this.errorId = errorId;
+        this.message = message;
+    }
+
+    public String getErrorId() {
+        return errorId;
+    }
+
+    public String getMessage(Object... args) {
+        MessageFormat formatter = new MessageFormat(this.message);
+        return formatter.format(args);
+    }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorException.java b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorException.java
new file mode 100644
index 0000000..2533da8
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorException.java
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.exception;
+
+public class ContextAggregatorException extends Exception {
+
+    private static final long serialVersionUID = 6281015319496239358L;
+    private String errorId;
+
+    public ContextAggregatorException() {
+    }
+
+    public ContextAggregatorException(ContextAggregatorError error, Object... args) {
+        super(error.getMessage(args));
+        this.errorId = error.getErrorId();
+    }
+
+    public ContextAggregatorException(String message) {
+        super(message);
+    }
+
+    public ContextAggregatorException(Throwable cause) {
+        super(cause);
+    }
+
+    public ContextAggregatorException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ContextAggregatorException(String message, Throwable cause, boolean enableSuppression,
+            boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/publisher/EventPublisherFactory.java b/src/main/java/org/onap/pomba/contextaggregator/publisher/EventPublisherFactory.java
new file mode 100644
index 0000000..588603c
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/publisher/EventPublisherFactory.java
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.publisher;
+
+import java.util.Properties;
+import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+
+public class EventPublisherFactory {
+
+    private String host;
+    private String topic;
+    private String motsid;
+    private String pass;
+    private int batchSize;
+    private int maxAge;
+    private int delay;
+    private String type;
+    private String partition;
+    private int retries;
+
+
+    public EventPublisherFactory(String host, String topic, String motsid, String pass, int batchSize, int maxAge,
+            int delay, String type, String partition, int retries) {
+        this.host = host;
+        this.topic = topic;
+        this.motsid = motsid;
+        this.pass = pass;
+        this.batchSize = batchSize;
+        this.maxAge = maxAge;
+        this.delay = delay;
+        this.type = type;
+        this.partition = partition;
+        this.retries = retries;
+    }
+
+    public String getPartition() {
+        return partition;
+    }
+
+    public int getRetries() {
+        return retries;
+    }
+
+    public MRBatchingPublisher createPublisher() {
+        final MRSimplerBatchPublisher publisher =
+                new MRSimplerBatchPublisher.Builder().againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic)
+                        .batchTo(batchSize, maxAge).httpThreadTime(delay).build();
+        publisher.setUsername(motsid);
+        publisher.setPassword(pass);
+        publisher.setProtocolFlag(type);
+
+        final Properties extraProps = new Properties();
+        extraProps.put("Protocol", "http");
+        extraProps.put("contenttype", "application/json");
+        publisher.setProps(extraProps);
+
+        return publisher;
+    }
+
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java b/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java
new file mode 100644
index 0000000..cacc8e6
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java
@@ -0,0 +1,114 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.rest;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
+import org.onap.aai.restclient.client.Headers;
+import org.onap.aai.restclient.client.OperationResult;
+import org.onap.aai.restclient.client.RestClient;
+import org.onap.pomba.contextaggregator.builder.ContextBuilder;
+import org.onap.pomba.contextaggregator.datatypes.POAEvent;
+import org.onap.pomba.contextaggregator.exception.ContextAggregatorError;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.util.UriComponents;
+import org.springframework.web.util.UriComponentsBuilder;
+
+public class RestRequest {
+
+    private static final String SERVICE_INSTANCE_ID = "serviceInstanceId";
+    private static final String MODEL_VERSION_ID = "modelVersionId";
+    private static final String MODE_INVARIANT_ID = "modelInvariantId";
+    private static final String CUSTOMER_ID = "customerId";
+    private static final String SERVICE_TYPE = "serviceType";
+
+    private static final String APP_NAME = "context-aggregator";
+
+    private static Logger log = LoggerFactory.getLogger(RestRequest.class);
+
+
+    private RestRequest() {
+        // intentionally empty
+    }
+
+    /**
+     * Retrieves the model data from the given context builder
+     *
+     * @param builder
+     * @param event
+     * @return Returns the JSON response from the context builder
+     */
+    public static String getModelData(ContextBuilder builder, POAEvent event) {
+        RestClient restClient = createRestClient(builder);
+
+        OperationResult result = restClient.get(generateUri(builder, event),
+                generateHeaders(event.getxTransactionId(), builder), MediaType.APPLICATION_JSON_TYPE);
+
+        if (result.wasSuccessful()) {
+            log.debug("Retrieved model data for '" + builder.getContextName() + "': " + result.getResult());
+            return result.getResult();
+        } else {
+            // failed! return null
+            log.error(ContextAggregatorError.FAILED_TO_GET_MODEL_DATA.getMessage(builder.getContextName(),
+                    result.getFailureCause()));
+            log.debug("Failed to retrieve model data for '" + builder.getContextName());
+            return null;
+        }
+    }
+
+    private static RestClient createRestClient(ContextBuilder builder) {
+        return new RestClient()
+                // .validateServerHostname(false)
+                // .validateServerCertChain(true)
+                // .clientCertFile(builder.getKeyStorePath())
+                // .clientCertPassword(builder.getKeyStorePassword())
+                // .trustStore(builder.getTrustStorePath())
+                .connectTimeoutMs(builder.getConnectionTimeout()).readTimeoutMs(builder.getReadTimeout());
+    }
+
+    private static String generateUri(ContextBuilder builder, POAEvent event) {
+        UriComponents uriComponents = UriComponentsBuilder.newInstance().scheme(builder.getProtocol())
+                .host(builder.getHost()).port(builder.getPort()).path(builder.getBaseUri())
+                .queryParam(SERVICE_INSTANCE_ID, event.getServiceInstanceId())
+                .queryParam(MODEL_VERSION_ID, event.getModelVersionId())
+                .queryParam(MODE_INVARIANT_ID, event.getModelInvariantId())
+                .queryParam(SERVICE_TYPE, event.getServiceType()).queryParam(CUSTOMER_ID, event.getCustomerId()).build()
+                .encode();
+        return uriComponents.toUriString();
+    }
+
+    private static Map> generateHeaders(String transactionId, ContextBuilder builder) {
+        MultivaluedMap headers = new MultivaluedHashMap<>();
+        headers.add(Headers.FROM_APP_ID, APP_NAME);
+        headers.add(Headers.TRANSACTION_ID, transactionId);
+        headers.add(Headers.AUTHORIZATION, getBasicAuthString(builder));
+        return headers;
+    }
+
+    private static String getBasicAuthString(ContextBuilder builder) {
+        String usernamePasswordString = builder.getUsername() + ":" + builder.getPassword();
+        String encodedString = Base64.getEncoder().encodeToString((usernamePasswordString).getBytes());
+        return "Basic " + encodedString;
+
+    }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
new file mode 100644
index 0000000..9d82fb6
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
@@ -0,0 +1,238 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.service;
+
+import com.att.aft.dme2.internal.gson.Gson;
+import com.att.aft.dme2.internal.gson.GsonBuilder;
+import com.att.aft.dme2.internal.gson.JsonSyntaxException;
+import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.MRPublisher;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.onap.pomba.contextaggregator.builder.ContextBuilder;
+import org.onap.pomba.contextaggregator.config.EventHeaderConfig;
+import org.onap.pomba.contextaggregator.datatypes.AggregatedModels;
+import org.onap.pomba.contextaggregator.datatypes.POAEvent;
+import org.onap.pomba.contextaggregator.exception.ContextAggregatorError;
+import org.onap.pomba.contextaggregator.exception.ContextAggregatorException;
+import org.onap.pomba.contextaggregator.publisher.EventPublisherFactory;
+import org.onap.pomba.contextaggregator.rest.RestRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ContextAggregatorProcessor implements Callable {
+
+    private Logger log = LoggerFactory.getLogger(ContextAggregatorProcessor.class);
+    private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+    private ExecutorService executor = Executors.newFixedThreadPool(10);
+    private int retriesRemaining;
+
+    @Autowired
+    private MRConsumer consumer;
+
+    @Autowired
+    private EventPublisherFactory publisherFactory;
+
+    @Autowired
+    List contextBuilders;
+
+    @Autowired
+    EventHeaderConfig eventHeaderConfig;
+
+
+    /**
+     * Parses the consumed event, retrieves model data from all context builders and publishes the
+     * aggregated models to DMaaP.
+     *
+     * @param payload
+     * @throws Exception
+     */
+    public void process(String payload) throws Exception {
+
+        log.debug("Consumed event: " + payload);
+        POAEvent event;
+        try {
+            event = parseEvent(payload);
+            log.debug("Received POA event: " + event.toString());
+        } catch (ContextAggregatorException e) {
+            log.error(ContextAggregatorError.INVALID_EVENT_RECEIVED.getMessage(e.getMessage()));
+            // TODO: publish to error topic?
+            return;
+        }
+
+        Map retrievedModels = new HashMap<>();
+        for (ContextBuilder builder : contextBuilders) {
+            log.debug("Retrieving model data for: " + builder.getContextName());
+            String modelData = RestRequest.getModelData(builder, event);
+            if (modelData == null) {
+                // If one of the Context builder return error, Aggregator will not publish the event
+                log.info("Error returned from one of the Context builder, no event will be published.");
+                return;
+            } else {
+                retrievedModels.put(builder.getContextName(), modelData);
+            }
+        }
+
+        try {
+            publishModels(new AggregatedModels(eventHeaderConfig, retrievedModels, event));
+        } catch (ContextAggregatorException e) {
+            log.error(ContextAggregatorError.FAILED_TO_PUBLISH_RESULT.getMessage(e.getMessage()));
+        }
+    }
+
+    @Override
+    public Void call() throws Exception {
+        while (true) {
+            for (String event : consumer.fetch()) {
+                executor.execute(() -> {
+                    try {
+                        process(event);
+                    } catch (Exception e) {
+                        log.error(e.getMessage());
+                    }
+                });
+            }
+        }
+    }
+
+    /**
+     * Parses, validates and returns a PAOEvent
+     *
+     * @param eventPayload
+     * @return
+     * @throws ContextAggregatorException
+     */
+    private POAEvent parseEvent(String eventPayload) throws ContextAggregatorException {
+        POAEvent event = null;
+        try {
+            event = gson.fromJson(eventPayload, POAEvent.class);
+        } catch (JsonSyntaxException e) {
+            throw new ContextAggregatorException(ContextAggregatorError.JSON_PARSER_ERROR, e.getMessage());
+        }
+        event.validate();
+        return event;
+    }
+
+    /**
+     * Publishes the aggregated models
+     *
+     * @param models
+     * @throws ContextAggregatorException
+     */
+    private void publishModels(AggregatedModels models) throws ContextAggregatorException {
+        String payload = models.generateJsonPayload();
+        log.debug("Publishing models: " + payload);
+        retriesRemaining = publisherFactory.getRetries();
+        publish(Arrays.asList(payload));
+    }
+
+    /**
+     * Publishes the given messages with a new EventPublisher instance. Will retry if problems are
+     * encountered.
+     *
+     * @param messages
+     * @throws ContextAggregatorException
+     */
+    private void publish(Collection messages) throws ContextAggregatorException {
+        MRBatchingPublisher publisher = publisherFactory.createPublisher();
+        String partition = publisherFactory.getPartition();
+        try {
+            ((MRSimplerBatchPublisher) publisher).getProps().put("partition", partition);
+            final Collection dmaapMessages = new ArrayList();
+            for (final String message : messages) {
+                dmaapMessages.add(new MRPublisher.message(partition, message));
+            }
+
+            int sent = publisher.send(dmaapMessages);
+            if (sent != messages.size()) {
+                closePublisher(publisher);
+                retryOrThrow(messages, new ContextAggregatorException(ContextAggregatorError.PUBLISHER_SEND_ERROR,
+                        "failed to send synchronously to partition " + publisherFactory.getPartition()));
+            }
+        } catch (Exception e) {
+            closePublisher(publisher);
+            retryOrThrow(messages,
+                    new ContextAggregatorException(ContextAggregatorError.PUBLISHER_SEND_ERROR, e.getMessage()));
+        }
+        completeMessagePublishing(publisher);
+    }
+
+    /**
+     * Completes message publishing by closing the publisher. Will retry if an error is encountered.
+     *
+     * @param publisher
+     * @throws ContextAggregatorException
+     */
+    private void completeMessagePublishing(MRBatchingPublisher publisher) throws ContextAggregatorException {
+        List unsentMessages = closePublisher(publisher);
+        if ((unsentMessages != null) && !unsentMessages.isEmpty()) {
+            String errorString = String.valueOf(unsentMessages.size()) + " unsent message(s)";
+            retryOrThrow(unsentMessages,
+                    new ContextAggregatorException(ContextAggregatorError.PUBLISHER_SEND_ERROR, errorString));
+        }
+    }
+
+    /**
+     * Retries to publish messages or throws the given exception if no retries are left
+     *
+     * @param messages
+     * @param exceptionToThrow
+     * @throws ContextAggregatorException
+     */
+    private void retryOrThrow(Collection messages, ContextAggregatorException exceptionToThrow)
+            throws ContextAggregatorException {
+        if (retriesRemaining <= 0) {
+            throw exceptionToThrow;
+        }
+        log.debug(String.format("Retrying to publish messages (%d %s remaining)...", retriesRemaining,
+                ((retriesRemaining == 1) ? "retry" : "retries")));
+        retriesRemaining--;
+        publish(messages);
+    }
+
+    /**
+     * Closes the event publisher and returns any unsent messages
+     *
+     * @param publisher
+     * @return
+     * @throws ContextAggregatorException
+     */
+    private List closePublisher(MRBatchingPublisher publisher) throws ContextAggregatorException {
+        try {
+            return publisher.close(20L, TimeUnit.SECONDS).stream().map(m -> m.fMsg).collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new ContextAggregatorException(ContextAggregatorError.PUBLISHER_CLOSE_ERROR, e.getMessage());
+        }
+    }
+
+}
+
diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorService.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorService.java
new file mode 100644
index 0000000..9c69d0b
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorService.java
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.service;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import javax.annotation.PostConstruct;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ContextAggregatorService {
+
+    @Autowired
+    private ContextAggregatorProcessor processor;
+
+    private Future processorTask;
+
+    private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+    @PostConstruct
+    public void init() {
+
+        processorTask = executor.submit(processor);
+
+    }
+
+}
-- 
cgit 1.2.3-korg