summaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
authorPrudence Au <prudence.au@amdocs.com>2018-07-16 22:02:04 -0400
committerPrudence Au <prudence.au@amdocs.com>2018-08-07 10:19:39 -0400
commit65c5d8b92e759ec4ee9d13f048b069033b4bf097 (patch)
tree237dd5c8d59d5696ae9813ecaac7dfb453a9f0ca /src/main
parent4267a59d4454d03a900f46d71e288d6590f162c0 (diff)
POMBA Context Aggregator
Change-Id: I33acc1f46d35447b63e2d5438c9ce6728a2b03eb Issue-ID: LOG-519 Signed-off-by: Prudence Au <prudence.au@amdocs.com> POMBA Context Aggregator with JUnit tests Change-Id: I33acc1f46d35447b63e2d5438c9ce6728a2b03eb Issue-ID: LOG-519 Signed-off-by: Prudence Au <prudence.au@amdocs.com>
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/Application.java33
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/builder/ContextBuilder.java103
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/config/BuilderConfigLoader.java105
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/config/EventConfig.java37
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/config/EventHeaderConfig.java62
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java66
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/datatypes/AggregatedModels.java170
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/datatypes/POAEvent.java143
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java51
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorException.java50
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/publisher/EventPublisherFactory.java77
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java114
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java238
-rw-r--r--src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorService.java44
14 files changed, 1293 insertions, 0 deletions
diff --git a/src/main/java/org/onap/pomba/contextaggregator/Application.java b/src/main/java/org/onap/pomba/contextaggregator/Application.java
new file mode 100644
index 0000000..ad3a49d
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/Application.java
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+
+ System.setProperty("server.ssl.enabled", "false");
+
+ SpringApplication.run(Application.class, args);
+ }
+
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/builder/ContextBuilder.java b/src/main/java/org/onap/pomba/contextaggregator/builder/ContextBuilder.java
new file mode 100644
index 0000000..c1ef808
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/builder/ContextBuilder.java
@@ -0,0 +1,103 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.builder;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class ContextBuilder {
+
+ private static final String HOST = "server.host";
+ private static final String PORT = "server.port";
+ private static final String PROTOCOL = "server.protocol";
+ private static final String SECURITY_PROTOCOL = "security.protocol";
+ private static final String CONNECTION_TIMEOUT = "connection.timeout.ms";
+ private static final String READ_TIMEOUT = "read.timeout.ms";
+ private static final String BASE_URI = "base.uri";
+ private static final String USERNAME = "basicauth.username";
+ private static final String PASSWORD = "basicauth.password";
+
+ private Properties properties;
+ private String contextName;
+
+
+ /**
+ * Instantiates a context builder by loading the given properties file. The context name is
+ * extracted from the file name. File name format is expected to be [context-name].properties (ex:
+ * aai.properties)
+ *
+ * @param configFile
+ * @throws IOException
+ */
+ public ContextBuilder(File configFile) throws IOException {
+ properties = new Properties();
+ InputStream input = new FileInputStream(configFile.getAbsolutePath());
+ properties.load(input);
+ String fileName = configFile.getName();
+ contextName = fileName.substring(0, fileName.lastIndexOf('.'));
+ }
+
+ public ContextBuilder(InputStream is, String resName) throws IOException {
+ properties = new Properties();
+ properties.load(is);
+ contextName = resName.substring(0, resName.lastIndexOf('.'));
+ }
+
+ public String getContextName() {
+ return contextName;
+ }
+
+ public String getHost() {
+ return properties.getProperty(HOST);
+ }
+
+ public int getPort() {
+ return Integer.parseInt(properties.getProperty(PORT));
+ }
+
+ public String getProtocol() {
+ return properties.getProperty(PROTOCOL);
+ }
+
+ public String getSecurityProtocol() {
+ return properties.getProperty(SECURITY_PROTOCOL);
+ }
+
+ public int getConnectionTimeout() {
+ return Integer.parseInt(properties.getProperty(CONNECTION_TIMEOUT));
+ }
+
+ public int getReadTimeout() {
+ return Integer.parseInt(properties.getProperty(READ_TIMEOUT));
+ }
+
+ public String getBaseUri() {
+ return properties.getProperty(BASE_URI);
+ }
+
+ public String getUsername() {
+ return properties.getProperty(USERNAME);
+ }
+
+ public String getPassword() {
+ return properties.getProperty(PASSWORD);
+ }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/BuilderConfigLoader.java b/src/main/java/org/onap/pomba/contextaggregator/config/BuilderConfigLoader.java
new file mode 100644
index 0000000..da9fe0f
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/config/BuilderConfigLoader.java
@@ -0,0 +1,105 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.config;
+
+import static java.lang.String.format;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.onap.pomba.contextaggregator.builder.ContextBuilder;
+import org.onap.pomba.contextaggregator.exception.ContextAggregatorError;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.io.Resource;
+import org.springframework.core.io.support.ResourcePatternResolver;
+
+@Configuration
+public class BuilderConfigLoader {
+ private static Logger log = LoggerFactory.getLogger(BuilderConfigLoader.class);
+ private static final Resource[] EMPTY_ARRAY = new Resource[0];
+ private static final String[] BUILDERS_PROPERTIES_LOCATION =
+ {"file:./%s/*.properties", "classpath:/%s/*.properties"};
+
+ @Autowired
+ private ResourcePatternResolver rpr;
+ @Value("${builders.properties.path}")
+ private String buildersPropertiesPath;
+
+
+ /**
+ * Generates a list of context builders by loading property files (*.properties) from a configured
+ * builders location.
+ *
+ * <pre>
+ * The location is searched in the following order: file:./${buildersPropertiesPath} and
+ * classpath:/${buildersPropertiesPath} to support override of default values.
+ *
+ * @return Returns a list of context builders
+ */
+ @Bean
+ public List<ContextBuilder> contextBuilders() {
+ try {
+ final Resource[] blrdsConfig = resolveBldrsConfig();
+ if (isEmpty(blrdsConfig)) {
+ log.error(ContextAggregatorError.BUILDER_PROPERTIES_NOT_FOUND
+ .getMessage(Arrays.toString(bldrsPropLoc2Path(buildersPropertiesPath))));
+ return Collections.emptyList();
+ }
+
+ final List<ContextBuilder> contextBuilders = new ArrayList<>();
+ for (Resource r : blrdsConfig) {
+ contextBuilders.add(new ContextBuilder(r.getInputStream(), r.getFilename()));
+ }
+
+ return contextBuilders;
+ } catch (IOException ex) {
+ log.error(ContextAggregatorError.FAILED_TO_LOAD_BUILDER_PROPERTIES.getMessage(ex.getMessage()));
+ }
+
+ return Collections.emptyList();
+ }
+
+ private Resource[] resolveBldrsConfig() throws IOException {
+ for (String p : bldrsPropLoc2Path(buildersPropertiesPath)) {
+ Resource[] bldrsConfig = rpr.getResources(p);
+ if (!isEmpty(bldrsConfig)) {
+ return bldrsConfig;
+ }
+ }
+ return EMPTY_ARRAY;
+ }
+
+ private static String[] bldrsPropLoc2Path(String buildersPropertiesPath) {
+ String[] res = new String[BUILDERS_PROPERTIES_LOCATION.length];
+ int indx = 0;
+ for (String tmpl : BUILDERS_PROPERTIES_LOCATION) {
+ res[indx++] = format(tmpl, buildersPropertiesPath).replace("//", "/");
+ }
+ return res;
+ }
+
+ private static boolean isEmpty(Object[] obj) {
+ return obj == null || obj.length == 0;
+ }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/EventConfig.java b/src/main/java/org/onap/pomba/contextaggregator/config/EventConfig.java
new file mode 100644
index 0000000..f70d0c2
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/config/EventConfig.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.config;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class EventConfig {
+
+ @Bean
+ public EventHeaderConfig eventHeaderConfig(@Value("${event.header.domain}") String domain,
+ @Value("${event.header.source-name}") String sourceName,
+ @Value("${event.header.event-type}") String eventType,
+ @Value("${event.header.entity-type}") String entityType,
+ @Value("${event.header.topic-entity-type}") String topicEntityType,
+ @Value("${event.header.topic-name}") String topicName) {
+
+ return new EventHeaderConfig(domain, sourceName, eventType, entityType, topicEntityType, topicName);
+ }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/EventHeaderConfig.java b/src/main/java/org/onap/pomba/contextaggregator/config/EventHeaderConfig.java
new file mode 100644
index 0000000..a1cd4f1
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/config/EventHeaderConfig.java
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.config;
+
+public class EventHeaderConfig {
+
+ private String domain;
+ private String sourceName;
+ private String eventType;
+ private String entityType;
+ private String topicEntityType;
+ private String topicName;
+
+ public EventHeaderConfig(String domain, String sourceName, String eventType, String entityType,
+ String topicEntityType, String topicName) {
+ this.domain = domain;
+ this.sourceName = sourceName;
+ this.eventType = eventType;
+ this.entityType = entityType;
+ this.topicEntityType = topicEntityType;
+ this.topicName = topicName;
+ }
+
+ public String getDomain() {
+ return domain;
+ }
+
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+ public String getTopicEntityType() {
+ return topicEntityType;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java b/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java
new file mode 100644
index 0000000..12f0cc8
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.config;
+
+import java.util.Properties;
+import org.onap.pomba.contextaggregator.publisher.EventPublisherFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+
+@Configuration
+public class TransportConfig {
+
+ @Bean
+ public MRConsumer consumer(@Value("${transport.consume.host}") String host,
+ @Value("${transport.consume.port}") String port, @Value("${transport.consume.topic}") String topic,
+ @Value("${transport.consume.motsid}") String motsid, @Value("${transport.consume.pass}") String pass,
+ @Value("${transport.consume.consumergroup}") String consumerGroup,
+ @Value("${transport.consume.consumerid}") String consumerId,
+ @Value("${transport.consume.timeout}") int timeout, @Value("${transport.consume.batchsize}") int batchSize,
+ @Value("${transport.consume.msglimit}") int msgLimit, @Value("${transport.consume.type}") String type) {
+
+ String hostStr = host + ":" + port;
+
+ final MRConsumer consumer = MRClientFactory.createConsumer(hostStr, topic, motsid, pass, consumerGroup,
+ consumerId, timeout, msgLimit, type, null);
+
+ final Properties extraProps = new Properties();
+ extraProps.put("Protocol", "http");
+ ((MRConsumerImpl) consumer).setProps(extraProps);
+
+ return consumer;
+ }
+
+ @Bean
+ public EventPublisherFactory publisherFactory(@Value("${transport.publish.host}") String host,
+ @Value("${transport.publish.port}") String port, @Value("${transport.publish.topic}") String topic,
+ @Value("${transport.publish.motsid}") String motsid, @Value("${transport.publish.pass}") String pass,
+ @Value("${transport.publish.batchsize}") int batchSize, @Value("${transport.publish.maxage}") int maxAge,
+ @Value("${transport.publish.delay}") int delay, @Value("${transport.publish.type}") String type,
+ @Value("${transport.publish.partition}") String partition,
+ @Value("${transport.publish.retries}") int retries) {
+
+ String hostStr = host + ":" + port;
+ return new EventPublisherFactory(hostStr, topic, motsid, pass, batchSize, maxAge, delay, type, partition,
+ retries);
+ }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/datatypes/AggregatedModels.java b/src/main/java/org/onap/pomba/contextaggregator/datatypes/AggregatedModels.java
new file mode 100644
index 0000000..5648b91
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/datatypes/AggregatedModels.java
@@ -0,0 +1,170 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.datatypes;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import org.onap.pomba.common.datatypes.ModelContext;
+import org.onap.pomba.contextaggregator.config.EventHeaderConfig;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+
+public class AggregatedModels {
+
+ @Expose
+ @SerializedName("event-header")
+ Header entityHeader;
+ @Expose
+ @SerializedName("entity")
+ POAEntity poaEntity;
+
+
+ /**
+ * Creates an event with an entity header and entity containing the models and poa-event from Dmaap
+ *
+ * @param headerConfig
+ * @param jsonContextMap
+ */
+ public AggregatedModels(EventHeaderConfig headerConfig, Map<String, String> jsonContextMap, POAEvent event) {
+ entityHeader = new Header(headerConfig);
+
+ Gson gson = new GsonBuilder().create();
+ Map<String, ModelContext> contextMap = new HashMap<>();
+ for (Entry<String, String> entry : jsonContextMap.entrySet()) {
+ ModelContext context = null;
+ if (entry.getValue().isEmpty()) {
+ context = new ModelContext();
+ context.setVf(null);
+ } else {
+ context = gson.fromJson(entry.getValue(), ModelContext.class);
+ }
+ contextMap.put(entry.getKey(), context);
+ }
+
+ poaEntity = new POAEntity(contextMap, event);
+ }
+
+
+ /**
+ * Returns this instance as a JSON payload
+ *
+ * @return
+ */
+ public String generateJsonPayload() {
+ Gson gson = new GsonBuilder().create();
+ String payload = gson.toJson(this);
+ return payload;
+ }
+
+ public Header getEntityHeader() {
+ return entityHeader;
+ }
+
+
+
+ /**
+ * Entity header class for JSON serialization
+ */
+ private class Header {
+ @Expose
+ private String id;
+ @Expose
+ private String domain;
+ @Expose
+ @SerializedName("source-name")
+ private String sourceName;
+ @Expose
+ @SerializedName("event-type")
+ private String eventType;
+ @Expose
+ @SerializedName("entity-type")
+ private String entityType;
+ @Expose
+ @SerializedName("top-entity-type")
+ private String topEntityType;
+ @Expose
+ @SerializedName("topic-name")
+ private String topicName;
+ @Expose
+ @SerializedName("event-id")
+ private String eventId;
+
+ public Header(EventHeaderConfig config) {
+ id = UUID.randomUUID().toString();
+ domain = config.getDomain();
+ sourceName = config.getSourceName();
+ eventType = config.getEventType();
+ entityType = config.getEntityType();
+ topEntityType = config.getTopicEntityType();
+ topicName = config.getTopicName();
+ eventId = UUID.randomUUID().toString();
+ }
+
+
+ public String getId() {
+ return id;
+ }
+
+ public String getDomain() {
+ return domain;
+ }
+
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public String getEntityType() {
+ return entityType;
+ }
+
+ public String getTopEntityType() {
+ return topEntityType;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public String getEventId() {
+ return eventId;
+ }
+ }
+
+
+ private class POAEntity {
+ @Expose
+ @SerializedName("poa-event")
+ POAEvent event;
+ @Expose
+ @SerializedName("context-list")
+ private Map<String, ModelContext> contextMap;
+
+ public POAEntity(Map<String, ModelContext> contextMap, POAEvent event) {
+ this.contextMap = contextMap;
+ this.event = event;
+ }
+ }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/datatypes/POAEvent.java b/src/main/java/org/onap/pomba/contextaggregator/datatypes/POAEvent.java
new file mode 100644
index 0000000..d54ece4
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/datatypes/POAEvent.java
@@ -0,0 +1,143 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.datatypes;
+
+import org.onap.pomba.contextaggregator.exception.ContextAggregatorError;
+import org.onap.pomba.contextaggregator.exception.ContextAggregatorException;
+
+public class POAEvent {
+
+ private String serviceInstanceId;
+ private String modelVersionId;
+ private String modelInvariantId;
+ private String customerId;
+ private String serviceType;
+ private String xFromAppId;
+ private String xTransactionId;
+
+ public POAEvent() {}
+
+ public String getServiceInstanceId() {
+ return serviceInstanceId;
+ }
+
+ public void setServiceInstanceId(String serviceInstanceId) {
+ this.serviceInstanceId = serviceInstanceId;
+ }
+
+ public String getModelVersionId() {
+ return modelVersionId;
+ }
+
+ public void setModelVersionId(String modelVersionId) {
+ this.modelVersionId = modelVersionId;
+ }
+
+ public String getModelInvariantId() {
+ return modelInvariantId;
+ }
+
+ public void setModelInvariantId(String modelInvariantId) {
+ this.modelInvariantId = modelInvariantId;
+ }
+
+ public String getCustomerId() {
+ return customerId;
+ }
+
+ public void setCustomerId(String customerId) {
+ this.customerId = customerId;
+ }
+
+ public String getServiceType() {
+ return serviceType;
+ }
+
+ public void setServiceType(String serviceType) {
+ this.serviceType = serviceType;
+ }
+
+ public String getxFromAppId() {
+ return xFromAppId;
+ }
+
+ public void setxFromAppId(String xFromAppId) {
+ this.xFromAppId = xFromAppId;
+ }
+
+ public String getxTransactionId() {
+ return xTransactionId;
+ }
+
+ public void setxTransactionId(String xTransactionId) {
+ this.xTransactionId = xTransactionId;
+ }
+
+ public boolean validate() throws ContextAggregatorException {
+ final String missing = " is missing";
+
+ // serviceInstanceId
+ if (getServiceInstanceId() == null || getServiceInstanceId().isEmpty()) {
+ throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED,
+ "serviceInstanceId" + missing);
+ }
+
+ // modelVersionId
+ if (getModelVersionId() == null || getModelVersionId().isEmpty()) {
+ throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED,
+ "modelVersionId" + missing);
+ }
+
+ // modelInvariantId
+ if (getModelInvariantId() == null || getModelInvariantId().isEmpty()) {
+ throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED,
+ "modelInvariantId" + missing);
+ }
+
+ // customerId
+ if (getCustomerId() == null || getCustomerId().isEmpty()) {
+ throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED, "customerId" + missing);
+ }
+
+ // serviceType
+ if (getServiceType() == null || getServiceType().isEmpty()) {
+ throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED,
+ "serviceType" + missing);
+ }
+
+ // X-FromAppId
+ if (getxFromAppId() == null || getxFromAppId().isEmpty()) {
+ throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED, "xFromAppId" + missing);
+ }
+
+ // X-TransactionId
+ if (getxTransactionId() == null || getxTransactionId().isEmpty()) {
+ throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED,
+ "xTransactionId" + missing);
+ }
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "POAEvent [serviceInstanceId=" + serviceInstanceId + ", modelVersionId=" + modelVersionId
+ + ", modelInvariantId=" + modelInvariantId + ", customerId=" + customerId + ", serviceType="
+ + serviceType + ", xFromAppId=" + xFromAppId + ", xTransactionId=" + xTransactionId + "]";
+ }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java
new file mode 100644
index 0000000..cee1fda
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.exception;
+
+import java.text.MessageFormat;
+
+public enum ContextAggregatorError {
+
+ GENERAL_ERROR("CA-100", "Internal error encountered: {0}"),
+ FAILED_TO_LOAD_BUILDER_PROPERTIES("CA-101", "Failed to load builder properties: {0}"),
+ JSON_PARSER_ERROR("CA-102", "Failed to parse JSON request: {0}"),
+ INVALID_EVENT_RECEIVED("CA-103", "Invalid event received: {0}"),
+ EVENT_ATTRIBUTE_MISSING("CA-104", "Mandatory attribute missing from event: {0}"),
+ FAILED_TO_GET_MODEL_DATA("CA-105", "Failed to retrieve model data for {0}, reason: {1}"),
+ PUBLISHER_SEND_ERROR("CA-106", "Error encountered when publishing messages: {0}"),
+ PUBLISHER_CLOSE_ERROR("CA-107", "Error encountered when closing publisher: {0}"),
+ FAILED_TO_PUBLISH_RESULT("CA-108", "Failed to publish model data: {0}"),
+ BUILDER_PROPERTIES_NOT_FOUND("CA-109", "No builder properties were found under location(s): {0}");
+
+ private String errorId;
+ private String message;
+
+ private ContextAggregatorError(String errorId, String message) {
+ this.errorId = errorId;
+ this.message = message;
+ }
+
+ public String getErrorId() {
+ return errorId;
+ }
+
+ public String getMessage(Object... args) {
+ MessageFormat formatter = new MessageFormat(this.message);
+ return formatter.format(args);
+ }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorException.java b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorException.java
new file mode 100644
index 0000000..2533da8
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorException.java
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.exception;
+
+public class ContextAggregatorException extends Exception {
+
+ private static final long serialVersionUID = 6281015319496239358L;
+ private String errorId;
+
+ public ContextAggregatorException() {
+ }
+
+ public ContextAggregatorException(ContextAggregatorError error, Object... args) {
+ super(error.getMessage(args));
+ this.errorId = error.getErrorId();
+ }
+
+ public ContextAggregatorException(String message) {
+ super(message);
+ }
+
+ public ContextAggregatorException(Throwable cause) {
+ super(cause);
+ }
+
+ public ContextAggregatorException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ContextAggregatorException(String message, Throwable cause, boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/publisher/EventPublisherFactory.java b/src/main/java/org/onap/pomba/contextaggregator/publisher/EventPublisherFactory.java
new file mode 100644
index 0000000..588603c
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/publisher/EventPublisherFactory.java
@@ -0,0 +1,77 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.publisher;
+
+import java.util.Properties;
+import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+
+public class EventPublisherFactory {
+
+ private String host;
+ private String topic;
+ private String motsid;
+ private String pass;
+ private int batchSize;
+ private int maxAge;
+ private int delay;
+ private String type;
+ private String partition;
+ private int retries;
+
+
+ public EventPublisherFactory(String host, String topic, String motsid, String pass, int batchSize, int maxAge,
+ int delay, String type, String partition, int retries) {
+ this.host = host;
+ this.topic = topic;
+ this.motsid = motsid;
+ this.pass = pass;
+ this.batchSize = batchSize;
+ this.maxAge = maxAge;
+ this.delay = delay;
+ this.type = type;
+ this.partition = partition;
+ this.retries = retries;
+ }
+
+ public String getPartition() {
+ return partition;
+ }
+
+ public int getRetries() {
+ return retries;
+ }
+
+ public MRBatchingPublisher createPublisher() {
+ final MRSimplerBatchPublisher publisher =
+ new MRSimplerBatchPublisher.Builder().againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic)
+ .batchTo(batchSize, maxAge).httpThreadTime(delay).build();
+ publisher.setUsername(motsid);
+ publisher.setPassword(pass);
+ publisher.setProtocolFlag(type);
+
+ final Properties extraProps = new Properties();
+ extraProps.put("Protocol", "http");
+ extraProps.put("contenttype", "application/json");
+ publisher.setProps(extraProps);
+
+ return publisher;
+ }
+
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java b/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java
new file mode 100644
index 0000000..cacc8e6
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java
@@ -0,0 +1,114 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.rest;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedHashMap;
+import javax.ws.rs.core.MultivaluedMap;
+import org.onap.aai.restclient.client.Headers;
+import org.onap.aai.restclient.client.OperationResult;
+import org.onap.aai.restclient.client.RestClient;
+import org.onap.pomba.contextaggregator.builder.ContextBuilder;
+import org.onap.pomba.contextaggregator.datatypes.POAEvent;
+import org.onap.pomba.contextaggregator.exception.ContextAggregatorError;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.util.UriComponents;
+import org.springframework.web.util.UriComponentsBuilder;
+
+public class RestRequest {
+
+ private static final String SERVICE_INSTANCE_ID = "serviceInstanceId";
+ private static final String MODEL_VERSION_ID = "modelVersionId";
+ private static final String MODE_INVARIANT_ID = "modelInvariantId";
+ private static final String CUSTOMER_ID = "customerId";
+ private static final String SERVICE_TYPE = "serviceType";
+
+ private static final String APP_NAME = "context-aggregator";
+
+ private static Logger log = LoggerFactory.getLogger(RestRequest.class);
+
+
+ private RestRequest() {
+ // intentionally empty
+ }
+
+ /**
+ * Retrieves the model data from the given context builder
+ *
+ * @param builder
+ * @param event
+ * @return Returns the JSON response from the context builder
+ */
+ public static String getModelData(ContextBuilder builder, POAEvent event) {
+ RestClient restClient = createRestClient(builder);
+
+ OperationResult result = restClient.get(generateUri(builder, event),
+ generateHeaders(event.getxTransactionId(), builder), MediaType.APPLICATION_JSON_TYPE);
+
+ if (result.wasSuccessful()) {
+ log.debug("Retrieved model data for '" + builder.getContextName() + "': " + result.getResult());
+ return result.getResult();
+ } else {
+ // failed! return null
+ log.error(ContextAggregatorError.FAILED_TO_GET_MODEL_DATA.getMessage(builder.getContextName(),
+ result.getFailureCause()));
+ log.debug("Failed to retrieve model data for '" + builder.getContextName());
+ return null;
+ }
+ }
+
+ private static RestClient createRestClient(ContextBuilder builder) {
+ return new RestClient()
+ // .validateServerHostname(false)
+ // .validateServerCertChain(true)
+ // .clientCertFile(builder.getKeyStorePath())
+ // .clientCertPassword(builder.getKeyStorePassword())
+ // .trustStore(builder.getTrustStorePath())
+ .connectTimeoutMs(builder.getConnectionTimeout()).readTimeoutMs(builder.getReadTimeout());
+ }
+
+ private static String generateUri(ContextBuilder builder, POAEvent event) {
+ UriComponents uriComponents = UriComponentsBuilder.newInstance().scheme(builder.getProtocol())
+ .host(builder.getHost()).port(builder.getPort()).path(builder.getBaseUri())
+ .queryParam(SERVICE_INSTANCE_ID, event.getServiceInstanceId())
+ .queryParam(MODEL_VERSION_ID, event.getModelVersionId())
+ .queryParam(MODE_INVARIANT_ID, event.getModelInvariantId())
+ .queryParam(SERVICE_TYPE, event.getServiceType()).queryParam(CUSTOMER_ID, event.getCustomerId()).build()
+ .encode();
+ return uriComponents.toUriString();
+ }
+
+ private static Map<String, List<String>> generateHeaders(String transactionId, ContextBuilder builder) {
+ MultivaluedMap<String, String> headers = new MultivaluedHashMap<>();
+ headers.add(Headers.FROM_APP_ID, APP_NAME);
+ headers.add(Headers.TRANSACTION_ID, transactionId);
+ headers.add(Headers.AUTHORIZATION, getBasicAuthString(builder));
+ return headers;
+ }
+
+ private static String getBasicAuthString(ContextBuilder builder) {
+ String usernamePasswordString = builder.getUsername() + ":" + builder.getPassword();
+ String encodedString = Base64.getEncoder().encodeToString((usernamePasswordString).getBytes());
+ return "Basic " + encodedString;
+
+ }
+}
diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
new file mode 100644
index 0000000..9d82fb6
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java
@@ -0,0 +1,238 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.service;
+
+import com.att.aft.dme2.internal.gson.Gson;
+import com.att.aft.dme2.internal.gson.GsonBuilder;
+import com.att.aft.dme2.internal.gson.JsonSyntaxException;
+import com.att.nsa.mr.client.MRBatchingPublisher;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.MRPublisher;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.onap.pomba.contextaggregator.builder.ContextBuilder;
+import org.onap.pomba.contextaggregator.config.EventHeaderConfig;
+import org.onap.pomba.contextaggregator.datatypes.AggregatedModels;
+import org.onap.pomba.contextaggregator.datatypes.POAEvent;
+import org.onap.pomba.contextaggregator.exception.ContextAggregatorError;
+import org.onap.pomba.contextaggregator.exception.ContextAggregatorException;
+import org.onap.pomba.contextaggregator.publisher.EventPublisherFactory;
+import org.onap.pomba.contextaggregator.rest.RestRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ContextAggregatorProcessor implements Callable<Void> {
+
+ private Logger log = LoggerFactory.getLogger(ContextAggregatorProcessor.class);
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+ private ExecutorService executor = Executors.newFixedThreadPool(10);
+ private int retriesRemaining;
+
+ @Autowired
+ private MRConsumer consumer;
+
+ @Autowired
+ private EventPublisherFactory publisherFactory;
+
+ @Autowired
+ List<ContextBuilder> contextBuilders;
+
+ @Autowired
+ EventHeaderConfig eventHeaderConfig;
+
+
+ /**
+ * Parses the consumed event, retrieves model data from all context builders and publishes the
+ * aggregated models to DMaaP.
+ *
+ * @param payload
+ * @throws Exception
+ */
+ public void process(String payload) throws Exception {
+
+ log.debug("Consumed event: " + payload);
+ POAEvent event;
+ try {
+ event = parseEvent(payload);
+ log.debug("Received POA event: " + event.toString());
+ } catch (ContextAggregatorException e) {
+ log.error(ContextAggregatorError.INVALID_EVENT_RECEIVED.getMessage(e.getMessage()));
+ // TODO: publish to error topic?
+ return;
+ }
+
+ Map<String, String> retrievedModels = new HashMap<>();
+ for (ContextBuilder builder : contextBuilders) {
+ log.debug("Retrieving model data for: " + builder.getContextName());
+ String modelData = RestRequest.getModelData(builder, event);
+ if (modelData == null) {
+ // If one of the Context builder return error, Aggregator will not publish the event
+ log.info("Error returned from one of the Context builder, no event will be published.");
+ return;
+ } else {
+ retrievedModels.put(builder.getContextName(), modelData);
+ }
+ }
+
+ try {
+ publishModels(new AggregatedModels(eventHeaderConfig, retrievedModels, event));
+ } catch (ContextAggregatorException e) {
+ log.error(ContextAggregatorError.FAILED_TO_PUBLISH_RESULT.getMessage(e.getMessage()));
+ }
+ }
+
+ @Override
+ public Void call() throws Exception {
+ while (true) {
+ for (String event : consumer.fetch()) {
+ executor.execute(() -> {
+ try {
+ process(event);
+ } catch (Exception e) {
+ log.error(e.getMessage());
+ }
+ });
+ }
+ }
+ }
+
+ /**
+ * Parses, validates and returns a PAOEvent
+ *
+ * @param eventPayload
+ * @return
+ * @throws ContextAggregatorException
+ */
+ private POAEvent parseEvent(String eventPayload) throws ContextAggregatorException {
+ POAEvent event = null;
+ try {
+ event = gson.fromJson(eventPayload, POAEvent.class);
+ } catch (JsonSyntaxException e) {
+ throw new ContextAggregatorException(ContextAggregatorError.JSON_PARSER_ERROR, e.getMessage());
+ }
+ event.validate();
+ return event;
+ }
+
+ /**
+ * Publishes the aggregated models
+ *
+ * @param models
+ * @throws ContextAggregatorException
+ */
+ private void publishModels(AggregatedModels models) throws ContextAggregatorException {
+ String payload = models.generateJsonPayload();
+ log.debug("Publishing models: " + payload);
+ retriesRemaining = publisherFactory.getRetries();
+ publish(Arrays.asList(payload));
+ }
+
+ /**
+ * Publishes the given messages with a new EventPublisher instance. Will retry if problems are
+ * encountered.
+ *
+ * @param messages
+ * @throws ContextAggregatorException
+ */
+ private void publish(Collection<String> messages) throws ContextAggregatorException {
+ MRBatchingPublisher publisher = publisherFactory.createPublisher();
+ String partition = publisherFactory.getPartition();
+ try {
+ ((MRSimplerBatchPublisher) publisher).getProps().put("partition", partition);
+ final Collection<MRPublisher.message> dmaapMessages = new ArrayList<MRPublisher.message>();
+ for (final String message : messages) {
+ dmaapMessages.add(new MRPublisher.message(partition, message));
+ }
+
+ int sent = publisher.send(dmaapMessages);
+ if (sent != messages.size()) {
+ closePublisher(publisher);
+ retryOrThrow(messages, new ContextAggregatorException(ContextAggregatorError.PUBLISHER_SEND_ERROR,
+ "failed to send synchronously to partition " + publisherFactory.getPartition()));
+ }
+ } catch (Exception e) {
+ closePublisher(publisher);
+ retryOrThrow(messages,
+ new ContextAggregatorException(ContextAggregatorError.PUBLISHER_SEND_ERROR, e.getMessage()));
+ }
+ completeMessagePublishing(publisher);
+ }
+
+ /**
+ * Completes message publishing by closing the publisher. Will retry if an error is encountered.
+ *
+ * @param publisher
+ * @throws ContextAggregatorException
+ */
+ private void completeMessagePublishing(MRBatchingPublisher publisher) throws ContextAggregatorException {
+ List<String> unsentMessages = closePublisher(publisher);
+ if ((unsentMessages != null) && !unsentMessages.isEmpty()) {
+ String errorString = String.valueOf(unsentMessages.size()) + " unsent message(s)";
+ retryOrThrow(unsentMessages,
+ new ContextAggregatorException(ContextAggregatorError.PUBLISHER_SEND_ERROR, errorString));
+ }
+ }
+
+ /**
+ * Retries to publish messages or throws the given exception if no retries are left
+ *
+ * @param messages
+ * @param exceptionToThrow
+ * @throws ContextAggregatorException
+ */
+ private void retryOrThrow(Collection<String> messages, ContextAggregatorException exceptionToThrow)
+ throws ContextAggregatorException {
+ if (retriesRemaining <= 0) {
+ throw exceptionToThrow;
+ }
+ log.debug(String.format("Retrying to publish messages (%d %s remaining)...", retriesRemaining,
+ ((retriesRemaining == 1) ? "retry" : "retries")));
+ retriesRemaining--;
+ publish(messages);
+ }
+
+ /**
+ * Closes the event publisher and returns any unsent messages
+ *
+ * @param publisher
+ * @return
+ * @throws ContextAggregatorException
+ */
+ private List<String> closePublisher(MRBatchingPublisher publisher) throws ContextAggregatorException {
+ try {
+ return publisher.close(20L, TimeUnit.SECONDS).stream().map(m -> m.fMsg).collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new ContextAggregatorException(ContextAggregatorError.PUBLISHER_CLOSE_ERROR, e.getMessage());
+ }
+ }
+
+}
+
diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorService.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorService.java
new file mode 100644
index 0000000..9c69d0b
--- /dev/null
+++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorService.java
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START===================================================
+ * Copyright (c) 2018 Amdocs
+ * ============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=====================================================
+ */
+package org.onap.pomba.contextaggregator.service;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import javax.annotation.PostConstruct;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class ContextAggregatorService {
+
+ @Autowired
+ private ContextAggregatorProcessor processor;
+
+ private Future<Void> processorTask;
+
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ @PostConstruct
+ public void init() {
+
+ processorTask = executor.submit(processor);
+
+ }
+
+}