diff options
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java | 81 |
1 files changed, 79 insertions, 2 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java index f62c1bca60..4fb4122984 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DmaapClientFactory.java @@ -20,12 +20,14 @@ package org.openecomp.sdc.be.components.distribution.engine; +import com.att.nsa.mr.client.MRBatchingPublisher; import com.att.nsa.mr.client.MRClientFactory; import com.att.nsa.mr.client.MRConsumer; import fj.data.Either; +import org.onap.sdc.security.SecurityUtil; import org.openecomp.sdc.be.config.DmaapConsumerConfiguration; +import org.openecomp.sdc.be.config.DmaapProducerConfiguration; import org.openecomp.sdc.common.log.wrappers.Logger; -import org.openecomp.sdc.security.SecurityUtil; import org.springframework.stereotype.Component; import java.io.File; @@ -46,11 +48,24 @@ public class DmaapClientFactory { * @return an instance object of type MRConsumer * @throws IOException */ - public MRConsumer create(DmaapConsumerConfiguration parameters) throws Exception { + public MRConsumer create(DmaapConsumerConfiguration parameters) throws GeneralSecurityException, IOException { MRConsumer consumer = MRClientFactory.createConsumer(buildProperties(parameters)); logger.info("MRConsumer created for topic {}", parameters.getTopic()); return consumer; } + + /** + * Creates DMAAP consumer according to received parameters + * @param parameters + * @return an instance object of type MRConsumer + * @throws IOException + */ + public MRBatchingPublisher createProducer(DmaapProducerConfiguration parameters) throws Exception { + Properties prop = buildProducerProperties(parameters); + MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(prop); + logger.info("MRBatchingPublisher created for topic {}", parameters.getTopic()); + return producer; + } private Properties buildProperties(DmaapConsumerConfiguration parameters) throws GeneralSecurityException, IOException { Properties props = new Properties(); @@ -80,6 +95,13 @@ public class DmaapClientFactory { props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", Integer.toString(parameters.getAftDme2ConnectionTimeoutMs())); props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", Integer.toString(parameters.getAftDme2RoundtripTimeoutMs())); props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", Integer.toString(parameters.getAftDme2ReadTimeoutMs())); + + props.setProperty("AFT_DME2_SSL_ENABLE", Boolean.toString(parameters.isAftDme2SslEnable())); + props.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", Boolean.toString(parameters.isAftDme2ClientIgnoreSslConfig())); + props.setProperty("AFT_DME2_CLIENT_KEYSTORE", parameters.getAftDme2ClientKeystore()); + props.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", parameters.getAftDme2ClientKeystorePassword()); + props.setProperty("AFT_DME2_CLIENT_SSL_CERT_ALIAS", parameters.getAftDme2ClientSslCertAlias()); + String dme2PreferredRouterFilePath = parameters.getDme2preferredRouterFilePath(); ensureFileExists(dme2PreferredRouterFilePath); @@ -98,6 +120,61 @@ public class DmaapClientFactory { return props; } + private Properties buildProducerProperties(DmaapProducerConfiguration parameters) throws GeneralSecurityException, IOException { + logger.info("The DmaapProducerConfiguration is {} ", parameters); + Properties props = new Properties(); + Either<String,String> passkey = SecurityUtil.INSTANCE.decrypt(parameters.getCredential().getPassword() ); + if (passkey.isRight()){ + throw new GeneralSecurityException("invalid password, cannot build properties"); + } + props.setProperty("Latitude", Double.toString(parameters.getLatitude())); + props.setProperty("Longitude", Double.toString(parameters.getLongitude())); + props.setProperty("Version", parameters.getVersion()); + props.setProperty("ServiceName", parameters.getServiceName()); + props.setProperty("Environment", parameters.getEnvironment()); + props.setProperty("Partner", parameters.getPartner()); + props.setProperty("routeOffer", parameters.getRouteOffer()); + props.setProperty("Protocol", parameters.getProtocol()); + props.setProperty("username", parameters.getCredential().getUsername()); + props.setProperty("password", passkey.left().value() ); + props.setProperty("contenttype", parameters.getContenttype()); + props.setProperty("host", parameters.getHosts()); + props.setProperty("topic", parameters.getTopic()); + props.setProperty("group", parameters.getConsumerGroup()); + props.setProperty("id", parameters.getConsumerId()); + props.setProperty("AFT_DME2_REQ_TRACE_ON", Boolean.toString(parameters.isDme2TraceOn())); + props.setProperty("AFT_ENVIRONMENT", parameters.getAftEnvironment()); + props.setProperty("AFT_DME2_EP_CONN_TIMEOUT", Integer.toString(parameters.getAftDme2ConnectionTimeoutMs())); + props.setProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", Integer.toString(parameters.getAftDme2RoundtripTimeoutMs())); + props.setProperty("AFT_DME2_EP_READ_TIMEOUT_MS", Integer.toString(parameters.getAftDme2ReadTimeoutMs())); + + props.setProperty("AFT_DME2_SSL_ENABLE", Boolean.toString(parameters.isAftDme2SslEnable())); + props.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", Boolean.toString(parameters.isAftDme2ClientIgnoreSslConfig())); + props.setProperty("AFT_DME2_CLIENT_KEYSTORE", parameters.getAftDme2ClientKeystore()); + props.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", parameters.getAftDme2ClientKeystorePassword()); + props.setProperty("AFT_DME2_CLIENT_SSL_CERT_ALIAS", parameters.getAftDme2ClientSslCertAlias()); + + String dme2PreferredRouterFilePath = parameters.getDme2preferredRouterFilePath(); + ensureFileExists(dme2PreferredRouterFilePath); + props.setProperty("DME2preferredRouterFilePath", dme2PreferredRouterFilePath); + props.setProperty("TransportType", "HTTPAAF"); + props.setProperty("SubContextPath", "/"); + props.setProperty("MethodType", "POST"); + props.setProperty("authKey", ""); + props.setProperty("authDate", ""); + props.setProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", ""); + props.setProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS", ""); + props.setProperty("sessionstickinessrequired", "no"); + + props.setProperty("maxBatchSize","1"); + props.setProperty("maxAgeMs","250"); + props.setProperty("partition","1"); + props.setProperty("MessageSentThreadOccurance","10"); + props.setProperty("Authorization","Basic bTEzMzMxQGNjZC5hdHQuY29tOkFhMTIzNDU2"); + + return props; + } + private void ensureFileExists(String filePath) throws IOException { File file = new File(filePath); if(file.createNewFile()) { |