diff options
author | Venkata Harish K Kajur <vk250x@att.com> | 2017-06-05 23:47:50 -0400 |
---|---|---|
committer | Venkata Harish K Kajur <vk250x@att.com> | 2017-06-06 02:11:26 -0400 |
commit | fb2eb3e99edfc76cbcda61c602a4734b6f21138c (patch) | |
tree | 4c086ee649cc57eeb7618fbfb02f9f39dbdbc8e8 | |
parent | 01fe915c4eded6c50116e0c6a9fcfae008fbd23e (diff) |
[AAI-ONAP] Make necessary changes to enable dmaap
Change-Id: Iba41aca29f59dce1c937e9a916f39b69521cdfe4
Signed-off-by: Venkata Harish K Kajur <vk250x@att.com>
6 files changed, 200 insertions, 314 deletions
diff --git a/aai-traversal/bundleconfig-local/etc/appprops/aaiEventDMaaPPublisher.properties b/aai-traversal/bundleconfig-local/etc/appprops/aaiEventDMaaPPublisher.properties index 1c2a6bf..4aa7445 100644 --- a/aai-traversal/bundleconfig-local/etc/appprops/aaiEventDMaaPPublisher.properties +++ b/aai-traversal/bundleconfig-local/etc/appprops/aaiEventDMaaPPublisher.properties @@ -1,28 +1,4 @@ -TransportType=DME2 -Latitude=39.099727 -Longitude=-94.578567 -Version=1.0 -ServiceName=ONAPserverTBD -Environment=TEST -routeOffer=MR1SBKCD -SubContextPath=/ Protocol=http -MethodType=POST -username=ONAPserverTBD -password= contenttype=application/json -host=ONAPserverTBD +host=localhost:3904 topic=AAI-EVENT -partition=AAI -maxBatchSize=100 -maxAgeMs=250 -AFT_DME2_EXCHANGE_REQUEST_HANDLERS=ONAPserverTBD -AFT_DME2_EXCHANGE_REPLY_HANDLERS=ONAPserverTBD -AFT_DME2_REQ_TRACE_ON=true -AFT_ENVIRONMENT=AFTUAT -AFT_DME2_EP_CONN_TIMEOUT=15000 -AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000 -AFT_DME2_EP_READ_TIMEOUT_MS=50000 -sessionstickinessrequired=NO -DME2preferredRouterFilePath=preferredRoute.txt -MessageSentThreadOccurance=50 diff --git a/aai-traversal/src/main/ajsc/ajsc-aai_v1/ajsc-aai/v1/conf/serviceBeans.xml b/aai-traversal/src/main/ajsc/ajsc-aai_v1/ajsc-aai/v1/conf/serviceBeans.xml index f4bc26a..2b90c85 100644 --- a/aai-traversal/src/main/ajsc/ajsc-aai_v1/ajsc-aai/v1/conf/serviceBeans.xml +++ b/aai-traversal/src/main/ajsc/ajsc-aai_v1/ajsc-aai/v1/conf/serviceBeans.xml @@ -34,6 +34,7 @@ </cxf:outInterceptors> </cxf:bus> <context:component-scan base-package="org.openecomp.aai.tasks" /> + <context:component-scan base-package="org.openecomp.aai.config" /> <task:scheduler id="taskScheduler" pool-size="10" /> <task:executor id="taskExecutor" pool-size="10" @@ -41,7 +42,57 @@ <task:annotation-driven executor="taskExecutor" scheduler="taskScheduler" /> + <bean id="jmsProperties" + class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" + name="jmsProperties"> + <property name="order" value="99999" /> + <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE" /> + <property name="ignoreUnresolvablePlaceholders" value="true" /> + <property name="properties"> + <value> - + <!-- JMS --> + JMS.BROKER.URL=tcp://localhost:61446 + JMS.QUEUE.NAME=IN_QUEUE -</beans>
\ No newline at end of file + </value> + </property> + </bean> + + <!-- ActiveMQ connection factory --> + <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> + <constructor-arg index="0" value="${JMS.BROKER.URL}" /> + </bean> + + <!-- ConnectionFactory Definition --> + <bean id="connectionFactory" + class="org.springframework.jms.connection.CachingConnectionFactory"> + <constructor-arg ref="amqConnectionFactory" /> + </bean> + + <!-- Destination Queue --> + <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"> + <constructor-arg index="0" value="${JMS.QUEUE.NAME}" /> + </bean> + + <!-- JmsTemplate Definition --> + <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> + <property name="connectionFactory" ref="connectionFactory" /> + <property name="defaultDestination" ref="destinationQueue" /> + </bean> + + <!-- Message Producer --> + <bean id="messageProducer" class="org.openecomp.aai.dmaap.AAIDmaapEventJMSProducer" /> + + <!-- Message Consumer from Default Destination --> + <bean id="messageDefaultConsumer" class="org.openecomp.aai.dmaap.AAIDmaapEventJMSConsumer" /> + + <!-- Message Consumer Container for Default Destination --> + <bean + class="org.springframework.jms.listener.DefaultMessageListenerContainer"> + <property name="connectionFactory" ref="connectionFactory" /> + <property name="destinationName" value="${JMS.QUEUE.NAME}" /> + <property name="messageListener" ref="messageDefaultConsumer" /> + </bean> + +</beans>
\ No newline at end of file diff --git a/aai-traversal/src/main/java/org/openecomp/aai/config/DmaapConfig.java b/aai-traversal/src/main/java/org/openecomp/aai/config/DmaapConfig.java new file mode 100644 index 0000000..a33e653 --- /dev/null +++ b/aai-traversal/src/main/java/org/openecomp/aai/config/DmaapConfig.java @@ -0,0 +1,42 @@ +/*- + * ============LICENSE_START======================================================= + * org.openecomp.aai + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.aai.config; + +import org.apache.activemq.broker.BrokerService; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class DmaapConfig { + + @Bean(destroyMethod = "stop") + public BrokerService brokerService() throws Exception { + + BrokerService broker = new BrokerService(); + broker.addConnector("tcp://localhost:61446"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setSchedulerSupport(false); + broker.start(); + + return broker; + } +} diff --git a/aai-traversal/src/main/java/org/openecomp/aai/db/DbMethHelper.java b/aai-traversal/src/main/java/org/openecomp/aai/db/DbMethHelper.java deleted file mode 100644 index 79172d9..0000000 --- a/aai-traversal/src/main/java/org/openecomp/aai/db/DbMethHelper.java +++ /dev/null @@ -1,158 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * org.openecomp.aai - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.aai.db; - -import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; - -import org.apache.tinkerpop.gremlin.structure.Vertex; -import org.apache.tinkerpop.gremlin.structure.VertexProperty; - -import org.openecomp.aai.exceptions.AAIException; -import org.openecomp.aai.introspection.Introspector; -import org.openecomp.aai.introspection.Loader; -import org.openecomp.aai.introspection.exceptions.AAIUnknownObjectException; -import org.openecomp.aai.parsers.query.QueryParser; -import org.openecomp.aai.parsers.relationship.RelationshipToURI; -import org.openecomp.aai.query.builder.QueryBuilder; -import org.openecomp.aai.serialization.engines.TransactionalGraphEngine; - -public class DbMethHelper { - - private final Loader loader; - private final TransactionalGraphEngine engine; - - protected DbMethHelper() { - this.loader = null; - this.engine = null; - } - public DbMethHelper(Loader loader, TransactionalGraphEngine engine) { - this.loader = loader; - this.engine = engine; - } - /** - * - * @param type - * @param map - form [{type}.{propname}:{value}] - * @return - * @throws UnsupportedEncodingException - * @throws AAIException - */ - public Optional<Vertex> searchVertexByIdentityMap(String type, Map<String, Object> map) throws AAIException { - - Introspector relationship = constructRelationship(type, map); - RelationshipToURI parser; - URI uri; - QueryParser queryParser; - try { - parser = new RelationshipToURI(loader, relationship); - uri = parser.getUri(); - queryParser = this.engine.getQueryBuilder().createQueryFromURI(uri); - } catch (UnsupportedEncodingException e) { - throw new AAIException("AAI_3000"); - } - - List<Vertex> results = queryParser.getQueryBuilder().toList(); - - return reduceToSingleVertex(results, map); - } - - /** - * @param type - * @param map - form [{propname}:{value}] - * @return - * @throws AAIException - */ - public Optional<Vertex> locateUniqueVertex(String type, Map<String, Object> map) throws AAIException { - - return reduceToSingleVertex(locateUniqueVertices(type, map), map); - } - - public List<Vertex> locateUniqueVertices(String type, Map<String, Object> map) throws AAIException { - Introspector obj = this.createIntrospectorFromMap(type, map); - QueryBuilder builder = this.engine.getQueryBuilder().exactMatchQuery(obj); - - return builder.toList(); - } - private Introspector constructRelationship(String type, Map<String, Object> map) throws AAIUnknownObjectException { - final Introspector relationship = loader.introspectorFromName("relationship"); - relationship.setValue("related-to", type); - final List<Object> data = relationship.getValue("relationship-data"); - for (Entry<String, Object> entry : map.entrySet()) { - final Introspector dataObj = loader.introspectorFromName("relationship-data"); - dataObj.setValue("relationship-key", entry.getKey()); - dataObj.setValue("relationship-value", entry.getValue()); - data.add(dataObj.getUnderlyingObject()); - } - - return relationship; - } - - private Introspector createIntrospectorFromMap(String targetNodeType, Map<String, Object> propHash) throws AAIUnknownObjectException { - final Introspector result = loader.introspectorFromName(targetNodeType); - for (Entry<String, Object> entry : propHash.entrySet()) { - result.setValue(entry.getKey(), entry.getValue()); - } - return result; - } - - private Optional<Vertex> reduceToSingleVertex(List<Vertex> vertices, Map<String, Object> map) throws AAIException { - if (vertices.isEmpty()){ - return Optional.empty(); - } else if (vertices.size() > 1) { - throw new AAIException("AAI_6112", "More than one Node found by getUniqueNode for params: " + map); - } - - return Optional.of(vertices.get(0)); - } - public List<String> getVertexProperties(Vertex v) { - List<String> retArr = new ArrayList<>(); - if( v == null ){ - retArr.add("null Node object passed to showPropertiesForNode()\n"); - } - else { - String nodeType; - Object ob = v.<Object>property("aai-node-type").orElse(null); - if( ob == null ){ - nodeType = "null"; - } - else{ - nodeType = ob.toString(); - } - - retArr.add(" AAINodeType/VtxID for this Node = [" + nodeType + "/" + v.id() + "]"); - retArr.add(" Property Detail: "); - Iterator<VertexProperty<Object>> pI = v.properties(); - while( pI.hasNext() ){ - VertexProperty<Object> tp = pI.next(); - Object val = tp.value(); - retArr.add("Prop: [" + tp.key() + "], val = [" + val + "] "); - } - } - return retArr; - } -} diff --git a/aai-traversal/src/main/java/org/openecomp/aai/dmaap/AAIDmaapEventJMSConsumer.java b/aai-traversal/src/main/java/org/openecomp/aai/dmaap/AAIDmaapEventJMSConsumer.java index e0c4ec7..a30ebc0 100644 --- a/aai-traversal/src/main/java/org/openecomp/aai/dmaap/AAIDmaapEventJMSConsumer.java +++ b/aai-traversal/src/main/java/org/openecomp/aai/dmaap/AAIDmaapEventJMSConsumer.java @@ -20,144 +20,119 @@ package org.openecomp.aai.dmaap; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.Properties; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import org.apache.log4j.MDC; +import org.json.JSONException; +import org.json.JSONObject; +import org.openecomp.aai.logging.ErrorLogHelper; +import org.openecomp.aai.util.AAIConstants; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import javax.ws.rs.core.MediaType; - -import org.apache.log4j.MDC; -import org.eclipse.jetty.util.security.Password; -import org.json.JSONException; -import org.json.JSONObject; - -import org.openecomp.aai.logging.ErrorLogHelper; -import org.openecomp.aai.util.AAIConstants; -import com.att.eelf.configuration.EELFLogger; -import com.att.eelf.configuration.EELFManager; -//import com.att.nsa.mr.client.MRBatchingPublisher; -//import com.att.nsa.mr.client.MRClientFactory; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; public class AAIDmaapEventJMSConsumer implements MessageListener { private static final EELFLogger LOGGER = EELFManager.getInstance().getLogger(AAIDmaapEventJMSConsumer.class); - private final static String COMPONENT = "aaiDmaapEvent"; - //private MRBatchingPublisher adp = null; + private Client httpClient; - private Properties props; + private Properties aaiEventProps; + private String aaiEventUrl = ""; - private String username; - private String password; - private String contentType; + public AAIDmaapEventJMSConsumer() throws org.apache.commons.configuration.ConfigurationException { + super(); + try { - private String url; - private Client client; + if (this.httpClient == null) { + FileReader reader = new FileReader(new File(AAIConstants.AAI_EVENT_DMAAP_PROPS)); + aaiEventProps = new Properties(); + aaiEventProps.load(reader); + reader.close(); - public AAIDmaapEventJMSConsumer() throws org.apache.commons.configuration.ConfigurationException { - //super(); - - //if (this.adp == null) { - //try { - //FileReader reader = new FileReader(new File(AAIConstants.AAI_EVENT_DMAAP_PROPS)); - //props = new Properties(); - //props.load(reader); - //props.setProperty("DME2preferredRouterFilePath", AAIConstants.AAI_HOME_ETC_APP_PROPERTIES + "preferredRoute.txt"); - //if (props.getProperty("password") != null && props.getProperty("password").startsWith("OBF:")) { - //props.setProperty("password", Password.deobfuscate(props.getProperty("password"))); - //} - //this.adp = MRClientFactory.createBatchingPublisher(props); - - //String host = props.getProperty("host"); - //String topic = props.getProperty("topic"); - //String protocol = props.getProperty("Protocol"); - - //username = props.getProperty("username"); - //password = props.getProperty("password"); - //contentType = props.getProperty("contenttype"); - - //url = protocol + "://" + host + "/events/" + topic; - //client = Client.create(); - //client.addFilter(new HTTPBasicAuthFilter(username, password)); - - //} catch (IOException e) { - //ErrorLogHelper.logError("AAI_4000", "Error updating dmaap config file for aai event."); - //} - //} + String host = aaiEventProps.getProperty("host"); + String topic = aaiEventProps.getProperty("topic"); + String protocol = aaiEventProps.getProperty("Protocol"); + + aaiEventUrl = protocol + "://" + host + "/events/" + topic; + httpClient = Client.create(); + } + + } catch (IOException e) { + ErrorLogHelper.logError("AAI_4000", "Error updating dmaap config file for aai event."); + } } @Override public void onMessage(Message message) { - //String jsmMessageTxt = ""; - //String aaiEvent = ""; - //String transId = ""; - //String fromAppId = ""; - //String fullId = ""; - - //if (message instanceof TextMessage) { - //try { - //jsmMessageTxt = ((TextMessage) message).getText(); - //JSONObject jo = new JSONObject(jsmMessageTxt); - - //if (jo.has("aaiEventPayload")) { - //aaiEvent = jo.getJSONObject("aaiEventPayload").toString(); - //} else { - //return; - //} - //if (jo.getString("transId") != null) { - //MDC.put("requestId", jo.getString("transId")); - //} - //if (jo.getString("fromAppId") != null) { - //MDC.put("partnerName", jo.getString("fromAppId")); - //} - //if (jo.getString("fullId") != null) { - //fullId = jo.getString("fullId"); - //} - - //LOGGER.info(fullId + "|" + transId + "|" + fromAppId + "|" + aaiEvent); - - //String environment = System.getProperty("lrmRO"); - - //if (environment == null) { - //this.adp.send(aaiEvent); - //} else { - //if (environment.startsWith("dev") || environment.startsWith("testINT") || environment.startsWith("testEXT")) { - - //WebResource webResource = client.resource(url); - - //ClientResponse response = webResource.accept(contentType).type(MediaType.APPLICATION_JSON).post(ClientResponse.class, aaiEvent); - - //if (response.getStatus() != 200) { - //System.out.println("Failed : HTTP error code : " + response.getStatus()); - //} - //} else { - //this.adp.send(aaiEvent); - //} - //} - - //} catch (IOException e) { - //if (e instanceof java.net.SocketException) { - //if (e.getMessage().contains("Connection reset")) { - //} else { - //ErrorLogHelper.logError("AAI_7304", "Error reaching DMaaP to send event. " + aaiEvent); - //} - //} else { - //ErrorLogHelper.logError("AAI_7304", "Error reaching DMaaP to send event. " + aaiEvent); - //} - //} catch (JMSException | JSONException e) { - //ErrorLogHelper.logError("AAI_7350", "Error parsing aaievent jsm message for sending to dmaap. " + jsmMessageTxt); - //} - //} + String jsmMessageTxt = ""; + String aaiEvent = ""; + String eventName = ""; + + if (message instanceof TextMessage) { + try { + jsmMessageTxt = ((TextMessage) message).getText(); + JSONObject jo = new JSONObject(jsmMessageTxt); + + if (jo.has("aaiEventPayload")) { + aaiEvent = jo.getJSONObject("aaiEventPayload").toString(); + } else { + return; + } + if (jo.getString("transId") != null) { + MDC.put("requestId", jo.getString("transId")); + } + if (jo.getString("fromAppId") != null) { + MDC.put("partnerName", jo.getString("fromAppId")); + } + if (jo.getString("event-topic") != null) { + eventName = jo.getString("event-topic"); + } + + LOGGER.info(eventName + "|" + aaiEvent); + if ("AAI-EVENT".equals(eventName)) { + this.sentWithHttp(this.httpClient, this.aaiEventUrl, aaiEvent); + } + } catch (java.net.SocketException e) { + if (!e.getMessage().contains("Connection reset")) { + LOGGER.error("AAI_7304 Error reaching DMaaP to send event. " + aaiEvent, e); + } + } catch (IOException e) { + LOGGER.error("AAI_7304 Error reaching DMaaP to send event. " + aaiEvent, e); + } catch (JMSException | JSONException e) { + LOGGER.error("AAI_7350 Error parsing aaievent jsm message for sending to dmaap. " + jsmMessageTxt, e); + } catch (Exception e) { + LOGGER.error("AAI_7350 Error sending message to dmaap. " + jsmMessageTxt, e); + } + } + + } + + private boolean sentWithHttp(Client client, String url, String aaiEvent) throws IOException { + + WebResource webResource = client.resource(url); + + ClientResponse response = webResource + .accept(MediaType.APPLICATION_JSON) + .type(MediaType.APPLICATION_JSON) + .post(ClientResponse.class, aaiEvent); + + if (response.getStatus() != 200) { + LOGGER.info("Failed : HTTP error code : " + response.getStatus()); + return false; + } + return true; } } diff --git a/aai-traversal/src/main/java/org/openecomp/aai/dmaap/AAIDmaapEventJMSProducer.java b/aai-traversal/src/main/java/org/openecomp/aai/dmaap/AAIDmaapEventJMSProducer.java index 8b27f3a..305e585 100644 --- a/aai-traversal/src/main/java/org/openecomp/aai/dmaap/AAIDmaapEventJMSProducer.java +++ b/aai-traversal/src/main/java/org/openecomp/aai/dmaap/AAIDmaapEventJMSProducer.java @@ -20,25 +20,25 @@ package org.openecomp.aai.dmaap; -//import org.apache.activemq.ActiveMQConnectionFactory; -//import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQQueue; import org.json.JSONObject; -//import org.springframework.jms.connection.CachingConnectionFactory; -//import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.connection.CachingConnectionFactory; +import org.springframework.jms.core.JmsTemplate; public class AAIDmaapEventJMSProducer { - //private JmsTemplate jmsTemplate; + private JmsTemplate jmsTemplate; public AAIDmaapEventJMSProducer() { - //this.jmsTemplate = new JmsTemplate(); - //this.jmsTemplate.setConnectionFactory(new CachingConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61446"))); - //this.jmsTemplate.setDefaultDestination(new ActiveMQQueue("IN_QUEUE")); + this.jmsTemplate = new JmsTemplate(); + this.jmsTemplate.setConnectionFactory(new CachingConnectionFactory(new ActiveMQConnectionFactory("tcp://localhost:61446"))); + this.jmsTemplate.setDefaultDestination(new ActiveMQQueue("IN_QUEUE")); } public void sendMessageToDefaultDestination(JSONObject finalJson) { - //jmsTemplate.convertAndSend(finalJson.toString()); - //CachingConnectionFactory ccf = (CachingConnectionFactory)this.jmsTemplate.getConnectionFactory(); - //ccf.destroy(); + jmsTemplate.convertAndSend(finalJson.toString()); + CachingConnectionFactory ccf = (CachingConnectionFactory)this.jmsTemplate.getConnectionFactory(); + ccf.destroy(); } } |