aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRSimplerBatchPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRSimplerBatchPublisher.java')
-rw-r--r--src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRSimplerBatchPublisher.java939
1 files changed, 939 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRSimplerBatchPublisher.java
new file mode 100644
index 0000000..10eef5e
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRSimplerBatchPublisher.java
@@ -0,0 +1,939 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.apache.http.HttpException;
+import org.apache.http.HttpStatus;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.HostSelector;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRPublisherResponse;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients.ProtocolTypeConstants;
+
+import com.att.aft.dme2.api.DME2Client;
+import com.att.aft.dme2.api.DME2Exception;
+
+public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher
+{
+ public static class Builder
+ {
+ public Builder ()
+ {
+ }
+
+ public Builder againstUrls ( Collection<String> baseUrls )
+ {
+ fUrls = baseUrls;
+ return this;
+ }
+
+ public Builder onTopic ( String topic )
+ {
+ fTopic = topic;
+ return this;
+ }
+
+ public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs )
+ {
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ return this;
+ }
+
+ public Builder compress ( boolean compress )
+ {
+ fCompress = compress;
+ return this;
+ }
+
+ public Builder httpThreadTime ( int threadOccuranceTime )
+ {
+ this.threadOccuranceTime = threadOccuranceTime;
+ return this;
+ }
+
+ public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts )
+ {
+ fAllowSelfSignedCerts = allowSelfSignedCerts;
+ return this;
+ }
+
+ public Builder withResponse ( boolean withResponse)
+ {
+ fWithResponse = withResponse;
+ return this;
+ }
+ public MRSimplerBatchPublisher build ()
+ {
+ if(!fWithResponse)
+ {
+ try {
+ return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ } else
+ {
+ try {
+ return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ private Collection<String> fUrls;
+ private String fTopic;
+ private int fMaxBatchSize = 100;
+ private long fMaxBatchAgeMs = 1000;
+ private boolean fCompress = false;
+ private int threadOccuranceTime = 50;
+ private boolean fAllowSelfSignedCerts = false;
+ private boolean fWithResponse = false;
+
+ };
+
+ @Override
+ public int send ( String partition, String msg )
+ {
+ return send ( new message ( partition, msg ) );
+ }
+ @Override
+ public int send ( String msg )
+ {
+ return send ( new message ( null, msg ) );
+ }
+
+
+ @Override
+ public int send ( message msg )
+ {
+ final LinkedList<message> list = new LinkedList<message> ();
+ list.add ( msg );
+ return send ( list );
+ }
+
+
+
+ @Override
+ public synchronized int send ( Collection<message> msgs )
+ {
+ if ( fClosed )
+ {
+ throw new IllegalStateException ( "The publisher was closed." );
+ }
+
+ for ( message userMsg : msgs )
+ {
+ fPending.add ( new TimestampedMessage ( userMsg ) );
+ }
+ return getPendingMessageCount ();
+ }
+
+ @Override
+ public synchronized int getPendingMessageCount ()
+ {
+ return fPending.size ();
+ }
+
+ @Override
+ public void close ()
+ {
+ try
+ {
+ final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
+ if ( remains.size() > 0 )
+ {
+ getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. "
+ + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." );
+ }
+ }
+ catch ( InterruptedException e )
+ {
+ getLog().warn ( "Possible message loss. " + e.getMessage(), e );
+ }
+ catch ( IOException e )
+ {
+ getLog().warn ( "Possible message loss. " + e.getMessage(), e );
+ }
+ }
+
+ @Override
+ public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException
+ {
+ synchronized ( this )
+ {
+ fClosed = true;
+
+ // stop the background sender
+ fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
+ fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
+ fExec.shutdown ();
+ }
+
+ final long now = Clock.now ();
+ final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
+ final long timeoutAtMs = now + waitInMs;
+
+ while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 )
+ {
+ send ( true );
+ Thread.sleep ( 250 );
+ }
+
+ synchronized ( this )
+ {
+ final LinkedList<message> result = new LinkedList<message> ();
+ fPending.drainTo ( result );
+ return result;
+ }
+ }
+
+ /**
+ * Possibly send a batch to the MR server. This is called by the background thread
+ * and the close() method
+ *
+ * @param force
+ */
+ private synchronized void send ( boolean force )
+ {
+ if ( force || shouldSendNow () )
+ {
+ if ( !sendBatch () )
+ {
+ getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+
+ // note the time for back-off
+ fDontSendUntilMs = sfWaitAfterError + Clock.now ();
+ }
+ }
+ }
+
+ private synchronized boolean shouldSendNow ()
+ {
+ boolean shouldSend = false;
+ if ( fPending.size () > 0 )
+ {
+ final long nowMs = Clock.now ();
+
+ shouldSend = ( fPending.size() >= fMaxBatchSize );
+ if ( !shouldSend )
+ {
+ final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
+ shouldSend = sendAtMs <= nowMs;
+ }
+
+ // however, wait after an error
+ shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
+ }
+ return shouldSend;
+ }
+
+ private synchronized boolean sendBatch ()
+ {
+ // it's possible for this call to be made with an empty list. in this case, just return.
+ if ( fPending.size() < 1 )
+ {
+ return true;
+ }
+
+ final long nowMs = Clock.now ();
+
+ host = this.fHostSelector.selectBaseHost();
+
+ final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
+
+
+ try
+ {
+ /*final String contentType =
+ fCompress ?
+ MRFormat.CAMBRIA_ZIP.toString () :
+ MRFormat.CAMBRIA.toString ()
+ ;*/
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
+ OutputStream os = baseStream;
+ final String contentType = props.getProperty("contenttype");
+ if(contentType.equalsIgnoreCase("application/json")){
+ JSONArray jsonArray = new JSONArray();
+ for ( TimestampedMessage m : fPending )
+ {
+ JSONObject jsonObject = new JSONObject(m.fMsg);
+
+ jsonArray.put(jsonObject);
+
+ }
+ os.write (jsonArray.toString().getBytes() );
+ os.close();
+
+ }else if (contentType.equalsIgnoreCase("text/plain")){
+ for ( TimestampedMessage m : fPending )
+ {
+ os.write ( m.fMsg.getBytes() );
+ os.write ( '\n' );
+ }
+ os.close ();
+ } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){
+ if ( contentType.equalsIgnoreCase("application/cambria-zip") )
+ {
+ os = new GZIPOutputStream ( baseStream );
+ }
+ for ( TimestampedMessage m : fPending )
+ {
+
+ os.write ( ( "" + m.fPartition.length () ).getBytes() );
+ os.write ( '.' );
+ os.write ( ( "" + m.fMsg.length () ).getBytes() );
+ os.write ( '.' );
+ os.write ( m.fPartition.getBytes() );
+ os.write ( m.fMsg.getBytes() );
+ os.write ( '\n' );
+ }
+ os.close ();
+ }else{
+ for ( TimestampedMessage m : fPending )
+ {
+ os.write ( m.fMsg.getBytes() );
+
+ }
+ os.close ();
+ }
+
+
+
+ final long startMs = Clock.now ();
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+
+
+ DME2Configue();
+
+ Thread.sleep(5);
+ getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
+ sender.setPayload(os.toString());
+ String dmeResponse = sender.sendAndWait(5000L);
+
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):"
+ + dmeResponse.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return true;
+ }
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
+ final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
+ //System.out.println(result.getInt("status"));
+ //Here we are checking for error response. If HTTP status
+ //code is not within the http success response code
+ //then we consider this as error and return false
+ if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+ return false;
+ }
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return true;
+ }
+
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
+ final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
+
+
+ //System.out.println(result.getInt("status"));
+ //Here we are checking for error response. If HTTP status
+ //code is not within the http success response code
+ //then we consider this as error and return false
+ if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+ return false;
+ }
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return true;
+ }
+ }
+ catch ( IllegalArgumentException x ) {
+ getLog().warn ( x.getMessage(), x );
+ } catch ( IOException x ) {
+ getLog().warn ( x.getMessage(), x );
+ } catch (HttpException x) {
+ getLog().warn ( x.getMessage(), x );
+ } catch (Exception x) {
+ getLog().warn(x.getMessage(), x);
+ }
+ return false;
+ }
+
+ public synchronized MRPublisherResponse sendBatchWithResponse ()
+ {
+ // it's possible for this call to be made with an empty list. in this case, just return.
+ if ( fPending.size() < 1 )
+ {
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage("No Messages to send");
+ return pubResponse;
+ }
+
+ final long nowMs = Clock.now ();
+
+ host = this.fHostSelector.selectBaseHost();
+
+ final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
+ OutputStream os=null;
+ try
+ {
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
+ os = baseStream;
+ final String contentType = props.getProperty("contenttype");
+ if(contentType.equalsIgnoreCase("application/json")){
+ JSONArray jsonArray = new JSONArray();
+ for ( TimestampedMessage m : fPending )
+ {
+ JSONObject jsonObject = new JSONObject(m.fMsg);
+
+ jsonArray.put(jsonObject);
+
+ }
+ os.write (jsonArray.toString().getBytes() );
+ }else if (contentType.equalsIgnoreCase("text/plain")){
+ for ( TimestampedMessage m : fPending )
+ {
+ os.write ( m.fMsg.getBytes() );
+ os.write ( '\n' );
+ }
+ } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){
+ if ( contentType.equalsIgnoreCase("application/cambria-zip") )
+ {
+ os = new GZIPOutputStream ( baseStream );
+ }
+ for ( TimestampedMessage m : fPending )
+ {
+
+ os.write ( ( "" + m.fPartition.length () ).getBytes() );
+ os.write ( '.' );
+ os.write ( ( "" + m.fMsg.length () ).getBytes() );
+ os.write ( '.' );
+ os.write ( m.fPartition.getBytes() );
+ os.write ( m.fMsg.getBytes() );
+ os.write ( '\n' );
+ }
+ os.close ();
+ }else{
+ for ( TimestampedMessage m : fPending )
+ {
+ os.write ( m.fMsg.getBytes() );
+
+ }
+ }
+
+
+
+ final long startMs = Clock.now ();
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+
+
+ try {
+ DME2Configue();
+
+ Thread.sleep(5);
+ getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
+ sender.setPayload(os.toString());
+
+
+ String dmeResponse = sender.sendAndWait(5000L);
+ System.out.println("dmeres->"+dmeResponse);
+
+
+ pubResponse = createMRPublisherResponse(dmeResponse,pubResponse);
+
+ if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+ final String logLine = String.valueOf((Clock.now() - startMs))
+ + dmeResponse.toString();
+ getLog().info(logLine);
+ fPending.clear();
+
+ }
+ catch (DME2Exception x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(x.getErrorCode());
+ pubResponse.setResponseMessage(x.getErrorMessage());
+ } catch (URISyntaxException x) {
+
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage(x.getMessage());
+ } catch (Exception x) {
+
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ }
+
+ return pubResponse;
+ }
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
+ final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
+ //System.out.println(result.getInt("status"));
+ //Here we are checking for error response. If HTTP status
+ //code is not within the http success response code
+ //then we consider this as error and return false
+
+
+ pubResponse = createMRPublisherResponse(result,pubResponse);
+
+ if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return pubResponse;
+ }
+
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
+ final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
+
+ //System.out.println(result.getInt("status"));
+ //Here we are checking for error response. If HTTP status
+ //code is not within the http success response code
+ //then we consider this as error and return false
+ pubResponse = createMRPublisherResponse(result,pubResponse);
+
+ if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+
+ final String logLine = String.valueOf((Clock.now() - startMs));
+ getLog().info(logLine);
+ fPending.clear();
+ return pubResponse;
+ }
+ }
+ catch ( IllegalArgumentException x ) {
+ getLog().warn ( x.getMessage(), x );
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ } catch ( IOException x ) {
+ getLog().warn ( x.getMessage(), x );
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ } catch (HttpException x) {
+ getLog().warn ( x.getMessage(), x );
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ } catch (Exception x) {
+ getLog().warn(x.getMessage(), x);
+
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ }
+
+ finally {
+ if (fPending.size()>0) {
+ getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+ pubResponse.setPendingMsgs(fPending.size());
+ }
+ if (os != null) {
+ try {
+ os.close();
+ } catch (Exception x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage("Error in closing Output Stream");
+ }
+ }
+ }
+
+ return pubResponse;
+ }
+
+private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
+
+ if (reply.isEmpty())
+ {
+
+ mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ mrPubResponse.setResponseMessage("Please verify the Producer properties");
+ }
+ else if(reply.startsWith("{"))
+ {
+ JSONObject jObject = new JSONObject(reply);
+ if(jObject.has("message") && jObject.has("status"))
+ {
+ String message = jObject.getString("message");
+ if(null != message)
+ {
+ mrPubResponse.setResponseMessage(message);
+ }
+ mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
+ }
+ else
+ {
+ mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+ mrPubResponse.setResponseMessage(reply);
+ }
+ }
+ else if (reply.startsWith("<"))
+ {
+ String responseCode = getHTTPErrorResponseCode(reply);
+ if( responseCode.contains("403"))
+ {
+ responseCode = "403";
+ }
+ mrPubResponse.setResponseCode(responseCode);
+ mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+ }
+
+ return mrPubResponse;
+ }
+
+ private final String fTopic;
+ private final int fMaxBatchSize;
+ private final long fMaxBatchAgeMs;
+ private final boolean fCompress;
+ private int threadOccuranceTime;
+ private boolean fClosed;
+ private String username;
+ private String password;
+ private String host;
+
+ //host selector
+ private HostSelector fHostSelector = null;
+
+ private final LinkedBlockingQueue<TimestampedMessage> fPending;
+ private long fDontSendUntilMs;
+ private final ScheduledThreadPoolExecutor fExec;
+
+ private String latitude;
+ private String longitude;
+ private String version;
+ private String serviceName;
+ private String env;
+ private String partner;
+ private String routeOffer;
+ private String subContextPath;
+ private String protocol;
+ private String methodType;
+ private String url;
+ private String dmeuser;
+ private String dmepassword;
+ private String contentType;
+ private static final long sfWaitAfterError = 10000;
+ private HashMap<String, String> DMETimeOuts;
+ private DME2Client sender;
+ public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
+ public String producerFilePath;
+ private String authKey;
+ private String authDate;
+ private String handlers;
+ private Properties props;
+ public static String routerFilePath;
+ public static Map<String, String> headers=new HashMap<String, String>();
+ public static MultivaluedMap<String, Object> headersMap;
+
+
+ private MRPublisherResponse pubResponse;
+
+ public MRPublisherResponse getPubResponse() {
+ return pubResponse;
+ }
+ public void setPubResponse(MRPublisherResponse pubResponse) {
+ this.pubResponse = pubResponse;
+ }
+
+ public static String getRouterFilePath() {
+ return routerFilePath;
+ }
+
+ public static void setRouterFilePath(String routerFilePath) {
+ MRSimplerBatchPublisher.routerFilePath = routerFilePath;
+ }
+
+ public Properties getProps() {
+ return props;
+ }
+
+ public void setProps(Properties props) {
+ this.props = props;
+ }
+
+ public String getProducerFilePath() {
+ return producerFilePath;
+ }
+
+ public void setProducerFilePath(String producerFilePath) {
+ this.producerFilePath = producerFilePath;
+ }
+
+ public String getProtocolFlag() {
+ return protocolFlag;
+ }
+
+ public void setProtocolFlag(String protocolFlag) {
+ this.protocolFlag = protocolFlag;
+ }
+
+
+ private void DME2Configue() throws Exception {
+ try {
+
+ /* FileReader reader = new FileReader(new File (producerFilePath));
+ Properties props = new Properties();
+ props.load(reader);*/
+ latitude = props.getProperty("Latitude");
+ longitude = props.getProperty("Longitude");
+ version = props.getProperty("Version");
+ serviceName = props.getProperty("ServiceName");
+ env = props.getProperty("Environment");
+ partner = props.getProperty("Partner");
+ routeOffer = props.getProperty("routeOffer");
+ subContextPath = props.getProperty("SubContextPath")+fTopic;
+ /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){
+ subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition");
+ }*/
+ protocol = props.getProperty("Protocol");
+ methodType = props.getProperty("MethodType");
+ dmeuser = props.getProperty("username");
+ dmepassword = props.getProperty("password");
+ contentType = props.getProperty("contenttype");
+ handlers = props.getProperty("sessionstickinessrequired");
+ routerFilePath= props.getProperty("DME2preferredRouterFilePath");
+
+ /**
+ * Changes to DME2Client url to use Partner for auto failover between data centers
+ * When Partner value is not provided use the routeOffer value for auto failover within a cluster
+ */
+
+
+ String partitionKey = props.getProperty("partition");
+
+ if (partner != null && !partner.isEmpty() )
+ {
+ url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner;
+ if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
+ url = url + "&partitionKey=" + partitionKey;
+ }
+ }
+ else if (routeOffer!=null && !routeOffer.isEmpty())
+ {
+ url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
+ if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
+ url = url + "&partitionKey=" + partitionKey;
+ }
+ }
+
+ DMETimeOuts = new HashMap<String, String>();
+ DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+ DMETimeOuts.put("Content-Type", contentType);
+ System.setProperty("AFT_LATITUDE", latitude);
+ System.setProperty("AFT_LONGITUDE", longitude);
+ System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
+ //System.setProperty("DME2.DEBUG", "true");
+ // System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true");
+ //System.out.println("XXXXXX"+url);
+
+ //SSL changes
+ System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
+ "SSLv3,TLSv1,TLSv1.1");
+ System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
+ System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
+
+ //SSL changes
+
+ sender = new DME2Client(new URI(url), 5000L);
+
+ sender.setAllowAllHttpReturnCodes(true);
+ sender.setMethod(methodType);
+ sender.setSubContext(subContextPath);
+ sender.setCredentials(dmeuser, dmepassword);
+ sender.setHeaders(DMETimeOuts);
+ if(handlers.equalsIgnoreCase("yes")){
+ sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+ sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
+ }else{
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client.HeaderReplyHandler");
+ }
+ } catch (DME2Exception x) {
+ getLog().warn(x.getMessage(), x);
+ throw new DME2Exception(x.getErrorCode(),x.getErrorMessage());
+ } catch (URISyntaxException x) {
+
+ getLog().warn(x.getMessage(), x);
+ throw new URISyntaxException(url,x.getMessage());
+ } catch (Exception x) {
+
+ getLog().warn(x.getMessage(), x);
+ throw new Exception(x.getMessage());
+ }
+ }
+
+ private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException
+ {
+ super ( hosts );
+
+ if ( topic == null || topic.length() < 1 )
+ {
+ throw new IllegalArgumentException ( "A topic must be provided." );
+ }
+
+ fHostSelector = new HostSelector(hosts, null);
+ fClosed = false;
+ fTopic = topic;
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ fCompress = compress;
+
+ fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+ fDontSendUntilMs = 0;
+ fExec = new ScheduledThreadPoolExecutor ( 1 );
+ pubResponse = new MRPublisherResponse();
+
+ }
+
+ private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException
+ {
+ super ( hosts );
+
+ if ( topic == null || topic.length() < 1 )
+ {
+ throw new IllegalArgumentException ( "A topic must be provided." );
+ }
+
+ fHostSelector = new HostSelector(hosts, null);
+ fClosed = false;
+ fTopic = topic;
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ fCompress = compress;
+ threadOccuranceTime=httpThreadOccurnace;
+ fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+ fDontSendUntilMs = 0;
+ fExec = new ScheduledThreadPoolExecutor ( 1 );
+ fExec.scheduleAtFixedRate ( new Runnable()
+ {
+ @Override
+ public void run ()
+ {
+ send ( false );
+ }
+ }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS );
+ }
+
+ private static class TimestampedMessage extends message
+ {
+ public TimestampedMessage ( message m )
+ {
+ super ( m );
+ timestamp = Clock.now();
+ }
+ public final long timestamp;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ public String getAuthKey() {
+ return authKey;
+ }
+
+ public void setAuthKey(String authKey) {
+ this.authKey = authKey;
+ }
+
+ public String getAuthDate() {
+ return authDate;
+ }
+
+ public void setAuthDate(String authDate) {
+ this.authDate = authDate;
+ }
+
+}