diff options
author | Prudence Au <prudence.au@amdocs.com> | 2018-07-16 22:02:04 -0400 |
---|---|---|
committer | Prudence Au <prudence.au@amdocs.com> | 2018-08-07 10:19:39 -0400 |
commit | 65c5d8b92e759ec4ee9d13f048b069033b4bf097 (patch) | |
tree | 237dd5c8d59d5696ae9813ecaac7dfb453a9f0ca /src/main | |
parent | 4267a59d4454d03a900f46d71e288d6590f162c0 (diff) |
POMBA Context Aggregator
Change-Id: I33acc1f46d35447b63e2d5438c9ce6728a2b03eb
Issue-ID: LOG-519
Signed-off-by: Prudence Au <prudence.au@amdocs.com>
POMBA Context Aggregator with JUnit tests
Change-Id: I33acc1f46d35447b63e2d5438c9ce6728a2b03eb
Issue-ID: LOG-519
Signed-off-by: Prudence Au <prudence.au@amdocs.com>
Diffstat (limited to 'src/main')
14 files changed, 1293 insertions, 0 deletions
diff --git a/src/main/java/org/onap/pomba/contextaggregator/Application.java b/src/main/java/org/onap/pomba/contextaggregator/Application.java new file mode 100644 index 0000000..ad3a49d --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/Application.java @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + + System.setProperty("server.ssl.enabled", "false"); + + SpringApplication.run(Application.class, args); + } + +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/builder/ContextBuilder.java b/src/main/java/org/onap/pomba/contextaggregator/builder/ContextBuilder.java new file mode 100644 index 0000000..c1ef808 --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/builder/ContextBuilder.java @@ -0,0 +1,103 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.builder; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +public class ContextBuilder { + + private static final String HOST = "server.host"; + private static final String PORT = "server.port"; + private static final String PROTOCOL = "server.protocol"; + private static final String SECURITY_PROTOCOL = "security.protocol"; + private static final String CONNECTION_TIMEOUT = "connection.timeout.ms"; + private static final String READ_TIMEOUT = "read.timeout.ms"; + private static final String BASE_URI = "base.uri"; + private static final String USERNAME = "basicauth.username"; + private static final String PASSWORD = "basicauth.password"; + + private Properties properties; + private String contextName; + + + /** + * Instantiates a context builder by loading the given properties file. The context name is + * extracted from the file name. File name format is expected to be [context-name].properties (ex: + * aai.properties) + * + * @param configFile + * @throws IOException + */ + public ContextBuilder(File configFile) throws IOException { + properties = new Properties(); + InputStream input = new FileInputStream(configFile.getAbsolutePath()); + properties.load(input); + String fileName = configFile.getName(); + contextName = fileName.substring(0, fileName.lastIndexOf('.')); + } + + public ContextBuilder(InputStream is, String resName) throws IOException { + properties = new Properties(); + properties.load(is); + contextName = resName.substring(0, resName.lastIndexOf('.')); + } + + public String getContextName() { + return contextName; + } + + public String getHost() { + return properties.getProperty(HOST); + } + + public int getPort() { + return Integer.parseInt(properties.getProperty(PORT)); + } + + public String getProtocol() { + return properties.getProperty(PROTOCOL); + } + + public String getSecurityProtocol() { + return properties.getProperty(SECURITY_PROTOCOL); + } + + public int getConnectionTimeout() { + return Integer.parseInt(properties.getProperty(CONNECTION_TIMEOUT)); + } + + public int getReadTimeout() { + return Integer.parseInt(properties.getProperty(READ_TIMEOUT)); + } + + public String getBaseUri() { + return properties.getProperty(BASE_URI); + } + + public String getUsername() { + return properties.getProperty(USERNAME); + } + + public String getPassword() { + return properties.getProperty(PASSWORD); + } +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/BuilderConfigLoader.java b/src/main/java/org/onap/pomba/contextaggregator/config/BuilderConfigLoader.java new file mode 100644 index 0000000..da9fe0f --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/config/BuilderConfigLoader.java @@ -0,0 +1,105 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.config; + +import static java.lang.String.format; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.onap.pomba.contextaggregator.builder.ContextBuilder; +import org.onap.pomba.contextaggregator.exception.ContextAggregatorError; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.ResourcePatternResolver; + +@Configuration +public class BuilderConfigLoader { + private static Logger log = LoggerFactory.getLogger(BuilderConfigLoader.class); + private static final Resource[] EMPTY_ARRAY = new Resource[0]; + private static final String[] BUILDERS_PROPERTIES_LOCATION = + {"file:./%s/*.properties", "classpath:/%s/*.properties"}; + + @Autowired + private ResourcePatternResolver rpr; + @Value("${builders.properties.path}") + private String buildersPropertiesPath; + + + /** + * Generates a list of context builders by loading property files (*.properties) from a configured + * builders location. + * + * <pre> + * The location is searched in the following order: file:./${buildersPropertiesPath} and + * classpath:/${buildersPropertiesPath} to support override of default values. + * + * @return Returns a list of context builders + */ + @Bean + public List<ContextBuilder> contextBuilders() { + try { + final Resource[] blrdsConfig = resolveBldrsConfig(); + if (isEmpty(blrdsConfig)) { + log.error(ContextAggregatorError.BUILDER_PROPERTIES_NOT_FOUND + .getMessage(Arrays.toString(bldrsPropLoc2Path(buildersPropertiesPath)))); + return Collections.emptyList(); + } + + final List<ContextBuilder> contextBuilders = new ArrayList<>(); + for (Resource r : blrdsConfig) { + contextBuilders.add(new ContextBuilder(r.getInputStream(), r.getFilename())); + } + + return contextBuilders; + } catch (IOException ex) { + log.error(ContextAggregatorError.FAILED_TO_LOAD_BUILDER_PROPERTIES.getMessage(ex.getMessage())); + } + + return Collections.emptyList(); + } + + private Resource[] resolveBldrsConfig() throws IOException { + for (String p : bldrsPropLoc2Path(buildersPropertiesPath)) { + Resource[] bldrsConfig = rpr.getResources(p); + if (!isEmpty(bldrsConfig)) { + return bldrsConfig; + } + } + return EMPTY_ARRAY; + } + + private static String[] bldrsPropLoc2Path(String buildersPropertiesPath) { + String[] res = new String[BUILDERS_PROPERTIES_LOCATION.length]; + int indx = 0; + for (String tmpl : BUILDERS_PROPERTIES_LOCATION) { + res[indx++] = format(tmpl, buildersPropertiesPath).replace("//", "/"); + } + return res; + } + + private static boolean isEmpty(Object[] obj) { + return obj == null || obj.length == 0; + } +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/EventConfig.java b/src/main/java/org/onap/pomba/contextaggregator/config/EventConfig.java new file mode 100644 index 0000000..f70d0c2 --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/config/EventConfig.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.config; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class EventConfig { + + @Bean + public EventHeaderConfig eventHeaderConfig(@Value("${event.header.domain}") String domain, + @Value("${event.header.source-name}") String sourceName, + @Value("${event.header.event-type}") String eventType, + @Value("${event.header.entity-type}") String entityType, + @Value("${event.header.topic-entity-type}") String topicEntityType, + @Value("${event.header.topic-name}") String topicName) { + + return new EventHeaderConfig(domain, sourceName, eventType, entityType, topicEntityType, topicName); + } +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/EventHeaderConfig.java b/src/main/java/org/onap/pomba/contextaggregator/config/EventHeaderConfig.java new file mode 100644 index 0000000..a1cd4f1 --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/config/EventHeaderConfig.java @@ -0,0 +1,62 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.config; + +public class EventHeaderConfig { + + private String domain; + private String sourceName; + private String eventType; + private String entityType; + private String topicEntityType; + private String topicName; + + public EventHeaderConfig(String domain, String sourceName, String eventType, String entityType, + String topicEntityType, String topicName) { + this.domain = domain; + this.sourceName = sourceName; + this.eventType = eventType; + this.entityType = entityType; + this.topicEntityType = topicEntityType; + this.topicName = topicName; + } + + public String getDomain() { + return domain; + } + + public String getSourceName() { + return sourceName; + } + + public String getEventType() { + return eventType; + } + + public String getEntityType() { + return entityType; + } + + public String getTopicEntityType() { + return topicEntityType; + } + + public String getTopicName() { + return topicName; + } +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java b/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java new file mode 100644 index 0000000..12f0cc8 --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/config/TransportConfig.java @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.config; + +import java.util.Properties; +import org.onap.pomba.contextaggregator.publisher.EventPublisherFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import com.att.nsa.mr.client.MRClientFactory; +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.impl.MRConsumerImpl; + +@Configuration +public class TransportConfig { + + @Bean + public MRConsumer consumer(@Value("${transport.consume.host}") String host, + @Value("${transport.consume.port}") String port, @Value("${transport.consume.topic}") String topic, + @Value("${transport.consume.motsid}") String motsid, @Value("${transport.consume.pass}") String pass, + @Value("${transport.consume.consumergroup}") String consumerGroup, + @Value("${transport.consume.consumerid}") String consumerId, + @Value("${transport.consume.timeout}") int timeout, @Value("${transport.consume.batchsize}") int batchSize, + @Value("${transport.consume.msglimit}") int msgLimit, @Value("${transport.consume.type}") String type) { + + String hostStr = host + ":" + port; + + final MRConsumer consumer = MRClientFactory.createConsumer(hostStr, topic, motsid, pass, consumerGroup, + consumerId, timeout, msgLimit, type, null); + + final Properties extraProps = new Properties(); + extraProps.put("Protocol", "http"); + ((MRConsumerImpl) consumer).setProps(extraProps); + + return consumer; + } + + @Bean + public EventPublisherFactory publisherFactory(@Value("${transport.publish.host}") String host, + @Value("${transport.publish.port}") String port, @Value("${transport.publish.topic}") String topic, + @Value("${transport.publish.motsid}") String motsid, @Value("${transport.publish.pass}") String pass, + @Value("${transport.publish.batchsize}") int batchSize, @Value("${transport.publish.maxage}") int maxAge, + @Value("${transport.publish.delay}") int delay, @Value("${transport.publish.type}") String type, + @Value("${transport.publish.partition}") String partition, + @Value("${transport.publish.retries}") int retries) { + + String hostStr = host + ":" + port; + return new EventPublisherFactory(hostStr, topic, motsid, pass, batchSize, maxAge, delay, type, partition, + retries); + } +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/datatypes/AggregatedModels.java b/src/main/java/org/onap/pomba/contextaggregator/datatypes/AggregatedModels.java new file mode 100644 index 0000000..5648b91 --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/datatypes/AggregatedModels.java @@ -0,0 +1,170 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.datatypes; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import org.onap.pomba.common.datatypes.ModelContext; +import org.onap.pomba.contextaggregator.config.EventHeaderConfig; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.Expose; +import com.google.gson.annotations.SerializedName; + +public class AggregatedModels { + + @Expose + @SerializedName("event-header") + Header entityHeader; + @Expose + @SerializedName("entity") + POAEntity poaEntity; + + + /** + * Creates an event with an entity header and entity containing the models and poa-event from Dmaap + * + * @param headerConfig + * @param jsonContextMap + */ + public AggregatedModels(EventHeaderConfig headerConfig, Map<String, String> jsonContextMap, POAEvent event) { + entityHeader = new Header(headerConfig); + + Gson gson = new GsonBuilder().create(); + Map<String, ModelContext> contextMap = new HashMap<>(); + for (Entry<String, String> entry : jsonContextMap.entrySet()) { + ModelContext context = null; + if (entry.getValue().isEmpty()) { + context = new ModelContext(); + context.setVf(null); + } else { + context = gson.fromJson(entry.getValue(), ModelContext.class); + } + contextMap.put(entry.getKey(), context); + } + + poaEntity = new POAEntity(contextMap, event); + } + + + /** + * Returns this instance as a JSON payload + * + * @return + */ + public String generateJsonPayload() { + Gson gson = new GsonBuilder().create(); + String payload = gson.toJson(this); + return payload; + } + + public Header getEntityHeader() { + return entityHeader; + } + + + + /** + * Entity header class for JSON serialization + */ + private class Header { + @Expose + private String id; + @Expose + private String domain; + @Expose + @SerializedName("source-name") + private String sourceName; + @Expose + @SerializedName("event-type") + private String eventType; + @Expose + @SerializedName("entity-type") + private String entityType; + @Expose + @SerializedName("top-entity-type") + private String topEntityType; + @Expose + @SerializedName("topic-name") + private String topicName; + @Expose + @SerializedName("event-id") + private String eventId; + + public Header(EventHeaderConfig config) { + id = UUID.randomUUID().toString(); + domain = config.getDomain(); + sourceName = config.getSourceName(); + eventType = config.getEventType(); + entityType = config.getEntityType(); + topEntityType = config.getTopicEntityType(); + topicName = config.getTopicName(); + eventId = UUID.randomUUID().toString(); + } + + + public String getId() { + return id; + } + + public String getDomain() { + return domain; + } + + public String getSourceName() { + return sourceName; + } + + public String getEventType() { + return eventType; + } + + public String getEntityType() { + return entityType; + } + + public String getTopEntityType() { + return topEntityType; + } + + public String getTopicName() { + return topicName; + } + + public String getEventId() { + return eventId; + } + } + + + private class POAEntity { + @Expose + @SerializedName("poa-event") + POAEvent event; + @Expose + @SerializedName("context-list") + private Map<String, ModelContext> contextMap; + + public POAEntity(Map<String, ModelContext> contextMap, POAEvent event) { + this.contextMap = contextMap; + this.event = event; + } + } +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/datatypes/POAEvent.java b/src/main/java/org/onap/pomba/contextaggregator/datatypes/POAEvent.java new file mode 100644 index 0000000..d54ece4 --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/datatypes/POAEvent.java @@ -0,0 +1,143 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.datatypes; + +import org.onap.pomba.contextaggregator.exception.ContextAggregatorError; +import org.onap.pomba.contextaggregator.exception.ContextAggregatorException; + +public class POAEvent { + + private String serviceInstanceId; + private String modelVersionId; + private String modelInvariantId; + private String customerId; + private String serviceType; + private String xFromAppId; + private String xTransactionId; + + public POAEvent() {} + + public String getServiceInstanceId() { + return serviceInstanceId; + } + + public void setServiceInstanceId(String serviceInstanceId) { + this.serviceInstanceId = serviceInstanceId; + } + + public String getModelVersionId() { + return modelVersionId; + } + + public void setModelVersionId(String modelVersionId) { + this.modelVersionId = modelVersionId; + } + + public String getModelInvariantId() { + return modelInvariantId; + } + + public void setModelInvariantId(String modelInvariantId) { + this.modelInvariantId = modelInvariantId; + } + + public String getCustomerId() { + return customerId; + } + + public void setCustomerId(String customerId) { + this.customerId = customerId; + } + + public String getServiceType() { + return serviceType; + } + + public void setServiceType(String serviceType) { + this.serviceType = serviceType; + } + + public String getxFromAppId() { + return xFromAppId; + } + + public void setxFromAppId(String xFromAppId) { + this.xFromAppId = xFromAppId; + } + + public String getxTransactionId() { + return xTransactionId; + } + + public void setxTransactionId(String xTransactionId) { + this.xTransactionId = xTransactionId; + } + + public boolean validate() throws ContextAggregatorException { + final String missing = " is missing"; + + // serviceInstanceId + if (getServiceInstanceId() == null || getServiceInstanceId().isEmpty()) { + throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED, + "serviceInstanceId" + missing); + } + + // modelVersionId + if (getModelVersionId() == null || getModelVersionId().isEmpty()) { + throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED, + "modelVersionId" + missing); + } + + // modelInvariantId + if (getModelInvariantId() == null || getModelInvariantId().isEmpty()) { + throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED, + "modelInvariantId" + missing); + } + + // customerId + if (getCustomerId() == null || getCustomerId().isEmpty()) { + throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED, "customerId" + missing); + } + + // serviceType + if (getServiceType() == null || getServiceType().isEmpty()) { + throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED, + "serviceType" + missing); + } + + // X-FromAppId + if (getxFromAppId() == null || getxFromAppId().isEmpty()) { + throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED, "xFromAppId" + missing); + } + + // X-TransactionId + if (getxTransactionId() == null || getxTransactionId().isEmpty()) { + throw new ContextAggregatorException(ContextAggregatorError.INVALID_EVENT_RECEIVED, + "xTransactionId" + missing); + } + + return true; + } + + @Override + public String toString() { + return "POAEvent [serviceInstanceId=" + serviceInstanceId + ", modelVersionId=" + modelVersionId + + ", modelInvariantId=" + modelInvariantId + ", customerId=" + customerId + ", serviceType=" + + serviceType + ", xFromAppId=" + xFromAppId + ", xTransactionId=" + xTransactionId + "]"; + } +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java new file mode 100644 index 0000000..cee1fda --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorError.java @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.exception; + +import java.text.MessageFormat; + +public enum ContextAggregatorError { + + GENERAL_ERROR("CA-100", "Internal error encountered: {0}"), + FAILED_TO_LOAD_BUILDER_PROPERTIES("CA-101", "Failed to load builder properties: {0}"), + JSON_PARSER_ERROR("CA-102", "Failed to parse JSON request: {0}"), + INVALID_EVENT_RECEIVED("CA-103", "Invalid event received: {0}"), + EVENT_ATTRIBUTE_MISSING("CA-104", "Mandatory attribute missing from event: {0}"), + FAILED_TO_GET_MODEL_DATA("CA-105", "Failed to retrieve model data for {0}, reason: {1}"), + PUBLISHER_SEND_ERROR("CA-106", "Error encountered when publishing messages: {0}"), + PUBLISHER_CLOSE_ERROR("CA-107", "Error encountered when closing publisher: {0}"), + FAILED_TO_PUBLISH_RESULT("CA-108", "Failed to publish model data: {0}"), + BUILDER_PROPERTIES_NOT_FOUND("CA-109", "No builder properties were found under location(s): {0}"); + + private String errorId; + private String message; + + private ContextAggregatorError(String errorId, String message) { + this.errorId = errorId; + this.message = message; + } + + public String getErrorId() { + return errorId; + } + + public String getMessage(Object... args) { + MessageFormat formatter = new MessageFormat(this.message); + return formatter.format(args); + } +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorException.java b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorException.java new file mode 100644 index 0000000..2533da8 --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/exception/ContextAggregatorException.java @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.exception; + +public class ContextAggregatorException extends Exception { + + private static final long serialVersionUID = 6281015319496239358L; + private String errorId; + + public ContextAggregatorException() { + } + + public ContextAggregatorException(ContextAggregatorError error, Object... args) { + super(error.getMessage(args)); + this.errorId = error.getErrorId(); + } + + public ContextAggregatorException(String message) { + super(message); + } + + public ContextAggregatorException(Throwable cause) { + super(cause); + } + + public ContextAggregatorException(String message, Throwable cause) { + super(message, cause); + } + + public ContextAggregatorException(String message, Throwable cause, boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/publisher/EventPublisherFactory.java b/src/main/java/org/onap/pomba/contextaggregator/publisher/EventPublisherFactory.java new file mode 100644 index 0000000..588603c --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/publisher/EventPublisherFactory.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.publisher; + +import java.util.Properties; +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.impl.MRConsumerImpl; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; + +public class EventPublisherFactory { + + private String host; + private String topic; + private String motsid; + private String pass; + private int batchSize; + private int maxAge; + private int delay; + private String type; + private String partition; + private int retries; + + + public EventPublisherFactory(String host, String topic, String motsid, String pass, int batchSize, int maxAge, + int delay, String type, String partition, int retries) { + this.host = host; + this.topic = topic; + this.motsid = motsid; + this.pass = pass; + this.batchSize = batchSize; + this.maxAge = maxAge; + this.delay = delay; + this.type = type; + this.partition = partition; + this.retries = retries; + } + + public String getPartition() { + return partition; + } + + public int getRetries() { + return retries; + } + + public MRBatchingPublisher createPublisher() { + final MRSimplerBatchPublisher publisher = + new MRSimplerBatchPublisher.Builder().againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic) + .batchTo(batchSize, maxAge).httpThreadTime(delay).build(); + publisher.setUsername(motsid); + publisher.setPassword(pass); + publisher.setProtocolFlag(type); + + final Properties extraProps = new Properties(); + extraProps.put("Protocol", "http"); + extraProps.put("contenttype", "application/json"); + publisher.setProps(extraProps); + + return publisher; + } + +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java b/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java new file mode 100644 index 0000000..cacc8e6 --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/rest/RestRequest.java @@ -0,0 +1,114 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.rest; + +import java.util.Base64; +import java.util.List; +import java.util.Map; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedHashMap; +import javax.ws.rs.core.MultivaluedMap; +import org.onap.aai.restclient.client.Headers; +import org.onap.aai.restclient.client.OperationResult; +import org.onap.aai.restclient.client.RestClient; +import org.onap.pomba.contextaggregator.builder.ContextBuilder; +import org.onap.pomba.contextaggregator.datatypes.POAEvent; +import org.onap.pomba.contextaggregator.exception.ContextAggregatorError; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.util.UriComponents; +import org.springframework.web.util.UriComponentsBuilder; + +public class RestRequest { + + private static final String SERVICE_INSTANCE_ID = "serviceInstanceId"; + private static final String MODEL_VERSION_ID = "modelVersionId"; + private static final String MODE_INVARIANT_ID = "modelInvariantId"; + private static final String CUSTOMER_ID = "customerId"; + private static final String SERVICE_TYPE = "serviceType"; + + private static final String APP_NAME = "context-aggregator"; + + private static Logger log = LoggerFactory.getLogger(RestRequest.class); + + + private RestRequest() { + // intentionally empty + } + + /** + * Retrieves the model data from the given context builder + * + * @param builder + * @param event + * @return Returns the JSON response from the context builder + */ + public static String getModelData(ContextBuilder builder, POAEvent event) { + RestClient restClient = createRestClient(builder); + + OperationResult result = restClient.get(generateUri(builder, event), + generateHeaders(event.getxTransactionId(), builder), MediaType.APPLICATION_JSON_TYPE); + + if (result.wasSuccessful()) { + log.debug("Retrieved model data for '" + builder.getContextName() + "': " + result.getResult()); + return result.getResult(); + } else { + // failed! return null + log.error(ContextAggregatorError.FAILED_TO_GET_MODEL_DATA.getMessage(builder.getContextName(), + result.getFailureCause())); + log.debug("Failed to retrieve model data for '" + builder.getContextName()); + return null; + } + } + + private static RestClient createRestClient(ContextBuilder builder) { + return new RestClient() + // .validateServerHostname(false) + // .validateServerCertChain(true) + // .clientCertFile(builder.getKeyStorePath()) + // .clientCertPassword(builder.getKeyStorePassword()) + // .trustStore(builder.getTrustStorePath()) + .connectTimeoutMs(builder.getConnectionTimeout()).readTimeoutMs(builder.getReadTimeout()); + } + + private static String generateUri(ContextBuilder builder, POAEvent event) { + UriComponents uriComponents = UriComponentsBuilder.newInstance().scheme(builder.getProtocol()) + .host(builder.getHost()).port(builder.getPort()).path(builder.getBaseUri()) + .queryParam(SERVICE_INSTANCE_ID, event.getServiceInstanceId()) + .queryParam(MODEL_VERSION_ID, event.getModelVersionId()) + .queryParam(MODE_INVARIANT_ID, event.getModelInvariantId()) + .queryParam(SERVICE_TYPE, event.getServiceType()).queryParam(CUSTOMER_ID, event.getCustomerId()).build() + .encode(); + return uriComponents.toUriString(); + } + + private static Map<String, List<String>> generateHeaders(String transactionId, ContextBuilder builder) { + MultivaluedMap<String, String> headers = new MultivaluedHashMap<>(); + headers.add(Headers.FROM_APP_ID, APP_NAME); + headers.add(Headers.TRANSACTION_ID, transactionId); + headers.add(Headers.AUTHORIZATION, getBasicAuthString(builder)); + return headers; + } + + private static String getBasicAuthString(ContextBuilder builder) { + String usernamePasswordString = builder.getUsername() + ":" + builder.getPassword(); + String encodedString = Base64.getEncoder().encodeToString((usernamePasswordString).getBytes()); + return "Basic " + encodedString; + + } +} diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java new file mode 100644 index 0000000..9d82fb6 --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorProcessor.java @@ -0,0 +1,238 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.service; + +import com.att.aft.dme2.internal.gson.Gson; +import com.att.aft.dme2.internal.gson.GsonBuilder; +import com.att.aft.dme2.internal.gson.JsonSyntaxException; +import com.att.nsa.mr.client.MRBatchingPublisher; +import com.att.nsa.mr.client.MRConsumer; +import com.att.nsa.mr.client.MRPublisher; +import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.onap.pomba.contextaggregator.builder.ContextBuilder; +import org.onap.pomba.contextaggregator.config.EventHeaderConfig; +import org.onap.pomba.contextaggregator.datatypes.AggregatedModels; +import org.onap.pomba.contextaggregator.datatypes.POAEvent; +import org.onap.pomba.contextaggregator.exception.ContextAggregatorError; +import org.onap.pomba.contextaggregator.exception.ContextAggregatorException; +import org.onap.pomba.contextaggregator.publisher.EventPublisherFactory; +import org.onap.pomba.contextaggregator.rest.RestRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class ContextAggregatorProcessor implements Callable<Void> { + + private Logger log = LoggerFactory.getLogger(ContextAggregatorProcessor.class); + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + private ExecutorService executor = Executors.newFixedThreadPool(10); + private int retriesRemaining; + + @Autowired + private MRConsumer consumer; + + @Autowired + private EventPublisherFactory publisherFactory; + + @Autowired + List<ContextBuilder> contextBuilders; + + @Autowired + EventHeaderConfig eventHeaderConfig; + + + /** + * Parses the consumed event, retrieves model data from all context builders and publishes the + * aggregated models to DMaaP. + * + * @param payload + * @throws Exception + */ + public void process(String payload) throws Exception { + + log.debug("Consumed event: " + payload); + POAEvent event; + try { + event = parseEvent(payload); + log.debug("Received POA event: " + event.toString()); + } catch (ContextAggregatorException e) { + log.error(ContextAggregatorError.INVALID_EVENT_RECEIVED.getMessage(e.getMessage())); + // TODO: publish to error topic? + return; + } + + Map<String, String> retrievedModels = new HashMap<>(); + for (ContextBuilder builder : contextBuilders) { + log.debug("Retrieving model data for: " + builder.getContextName()); + String modelData = RestRequest.getModelData(builder, event); + if (modelData == null) { + // If one of the Context builder return error, Aggregator will not publish the event + log.info("Error returned from one of the Context builder, no event will be published."); + return; + } else { + retrievedModels.put(builder.getContextName(), modelData); + } + } + + try { + publishModels(new AggregatedModels(eventHeaderConfig, retrievedModels, event)); + } catch (ContextAggregatorException e) { + log.error(ContextAggregatorError.FAILED_TO_PUBLISH_RESULT.getMessage(e.getMessage())); + } + } + + @Override + public Void call() throws Exception { + while (true) { + for (String event : consumer.fetch()) { + executor.execute(() -> { + try { + process(event); + } catch (Exception e) { + log.error(e.getMessage()); + } + }); + } + } + } + + /** + * Parses, validates and returns a PAOEvent + * + * @param eventPayload + * @return + * @throws ContextAggregatorException + */ + private POAEvent parseEvent(String eventPayload) throws ContextAggregatorException { + POAEvent event = null; + try { + event = gson.fromJson(eventPayload, POAEvent.class); + } catch (JsonSyntaxException e) { + throw new ContextAggregatorException(ContextAggregatorError.JSON_PARSER_ERROR, e.getMessage()); + } + event.validate(); + return event; + } + + /** + * Publishes the aggregated models + * + * @param models + * @throws ContextAggregatorException + */ + private void publishModels(AggregatedModels models) throws ContextAggregatorException { + String payload = models.generateJsonPayload(); + log.debug("Publishing models: " + payload); + retriesRemaining = publisherFactory.getRetries(); + publish(Arrays.asList(payload)); + } + + /** + * Publishes the given messages with a new EventPublisher instance. Will retry if problems are + * encountered. + * + * @param messages + * @throws ContextAggregatorException + */ + private void publish(Collection<String> messages) throws ContextAggregatorException { + MRBatchingPublisher publisher = publisherFactory.createPublisher(); + String partition = publisherFactory.getPartition(); + try { + ((MRSimplerBatchPublisher) publisher).getProps().put("partition", partition); + final Collection<MRPublisher.message> dmaapMessages = new ArrayList<MRPublisher.message>(); + for (final String message : messages) { + dmaapMessages.add(new MRPublisher.message(partition, message)); + } + + int sent = publisher.send(dmaapMessages); + if (sent != messages.size()) { + closePublisher(publisher); + retryOrThrow(messages, new ContextAggregatorException(ContextAggregatorError.PUBLISHER_SEND_ERROR, + "failed to send synchronously to partition " + publisherFactory.getPartition())); + } + } catch (Exception e) { + closePublisher(publisher); + retryOrThrow(messages, + new ContextAggregatorException(ContextAggregatorError.PUBLISHER_SEND_ERROR, e.getMessage())); + } + completeMessagePublishing(publisher); + } + + /** + * Completes message publishing by closing the publisher. Will retry if an error is encountered. + * + * @param publisher + * @throws ContextAggregatorException + */ + private void completeMessagePublishing(MRBatchingPublisher publisher) throws ContextAggregatorException { + List<String> unsentMessages = closePublisher(publisher); + if ((unsentMessages != null) && !unsentMessages.isEmpty()) { + String errorString = String.valueOf(unsentMessages.size()) + " unsent message(s)"; + retryOrThrow(unsentMessages, + new ContextAggregatorException(ContextAggregatorError.PUBLISHER_SEND_ERROR, errorString)); + } + } + + /** + * Retries to publish messages or throws the given exception if no retries are left + * + * @param messages + * @param exceptionToThrow + * @throws ContextAggregatorException + */ + private void retryOrThrow(Collection<String> messages, ContextAggregatorException exceptionToThrow) + throws ContextAggregatorException { + if (retriesRemaining <= 0) { + throw exceptionToThrow; + } + log.debug(String.format("Retrying to publish messages (%d %s remaining)...", retriesRemaining, + ((retriesRemaining == 1) ? "retry" : "retries"))); + retriesRemaining--; + publish(messages); + } + + /** + * Closes the event publisher and returns any unsent messages + * + * @param publisher + * @return + * @throws ContextAggregatorException + */ + private List<String> closePublisher(MRBatchingPublisher publisher) throws ContextAggregatorException { + try { + return publisher.close(20L, TimeUnit.SECONDS).stream().map(m -> m.fMsg).collect(Collectors.toList()); + } catch (Exception e) { + throw new ContextAggregatorException(ContextAggregatorError.PUBLISHER_CLOSE_ERROR, e.getMessage()); + } + } + +} + diff --git a/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorService.java b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorService.java new file mode 100644 index 0000000..9c69d0b --- /dev/null +++ b/src/main/java/org/onap/pomba/contextaggregator/service/ContextAggregatorService.java @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START=================================================== + * Copyright (c) 2018 Amdocs + * ============================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================================== + */ +package org.onap.pomba.contextaggregator.service; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import javax.annotation.PostConstruct; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class ContextAggregatorService { + + @Autowired + private ContextAggregatorProcessor processor; + + private Future<Void> processorTask; + + private ExecutorService executor = Executors.newSingleThreadExecutor(); + + @PostConstruct + public void init() { + + processorTask = executor.submit(processor); + + } + +} |