diff options
Diffstat (limited to 'src/main/java')
4 files changed, 245 insertions, 12 deletions
diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java index 32b44e4..f237776 100644 --- a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java +++ b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java @@ -39,6 +39,7 @@ import com.att.nsa.mr.client.impl.MRConsumerImpl; import com.att.nsa.mr.client.impl.MRMetaClient; import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher; import com.att.nsa.mr.test.clients.ProtocolTypeConstants; +import com.att.nsa.mr.tools.ValidatorUtil; /** * A factory for MR clients.<br/> @@ -483,7 +484,7 @@ public class MRClientFactory { MRSimplerBatchPublisher pub; if (withResponse) { pub = new MRSimplerBatchPublisher.Builder() - .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))) + .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")),MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType")) .onTopic(props.getProperty("topic")) .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())) @@ -492,7 +493,7 @@ public class MRClientFactory { .withResponse(withResponse).build(); } else { pub = new MRSimplerBatchPublisher.Builder() - .againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))) + .againstUrlsOrServiceName(MRConsumerImpl.stringToList(props.getProperty("host")), MRConsumerImpl.stringToList(props.getProperty("ServiceName")), props.getProperty("TransportType")) .onTopic(props.getProperty("topic")) .batchTo(Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())) @@ -512,13 +513,16 @@ public class MRClientFactory { } pub.setProtocolFlag(props.getProperty("TransportType")); pub.setProps(props); - routeFilePath = props.getProperty("DME2preferredRouterFilePath"); - routeReader = new FileReader(new File(routeFilePath)); prop = new Properties(); - File fo = new File(routeFilePath); - if (!fo.exists()) { - routeWriter = new FileWriter(new File(routeFilePath)); + if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.DME2.getValue())) { + routeFilePath = props.getProperty("DME2preferredRouterFilePath"); + routeReader = new FileReader(new File(routeFilePath)); + File fo = new File(routeFilePath); + if (!fo.exists()) { + routeWriter = new FileWriter(new File(routeFilePath)); + } } + // pub.setContentType(contentType); return pub; } @@ -623,6 +627,7 @@ public class MRClientFactory { public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException { int timeout; + ValidatorUtil.validateSubscriber(props); if (props.getProperty("timeout") != null) timeout = Integer.parseInt(props.getProperty("timeout")); else diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java index b54fedb..faa81ce 100644 --- a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java +++ b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java @@ -127,6 +127,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { try { // getLog().info ( "Receiving msgs from: " + // url+subContextPath ); + long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs : (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs); + //String reply = sender.sendAndWait(timeout); String reply = sender.sendAndWait(timeoutMs + 10000L); final JSONObject o = getResponseDataInJson(reply); // msgs.add(reply); @@ -362,6 +364,11 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { private HashMap<String, String> DMETimeOuts; private String handlers; public static final String routerFilePath = null; + private long dme2ReplyHandlerTimeoutMs; + private long longPollingMs; + private static final long defaultDme2PerEndPointTimeoutMs = 10000L; + private static final long defaultDme2ReplyHandlerTimeoutMs = 10000L; + public static String getRouterFilePath() { return routerFilePath; @@ -388,6 +395,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { } private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException { + this.longPollingMs = timeoutMs; latitude = props.getProperty("Latitude"); longitude = props.getProperty("Longitude"); version = props.getProperty("Version"); @@ -459,7 +467,38 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit"); // SSL changes - sender = new DME2Client(new URI(url), timeoutMs + 10000L); + long dme2PerEndPointTimeoutMs; + try { + dme2PerEndPointTimeoutMs = Long.parseLong(props.getProperty("DME2_PER_HANDLER_TIMEOUT_MS")); + //backward compatibility + if ( dme2PerEndPointTimeoutMs <= 0) { + dme2PerEndPointTimeoutMs = timeoutMs + defaultDme2PerEndPointTimeoutMs; + } + } catch (NumberFormatException nfe) { + //backward compatibility + dme2PerEndPointTimeoutMs = timeoutMs + defaultDme2PerEndPointTimeoutMs; + getLog().debug("DME2_PER_HANDLER_TIMEOUT_MS not set and using default " + defaultDme2PerEndPointTimeoutMs); + } + + try { + dme2ReplyHandlerTimeoutMs = Long.parseLong(props.getProperty("DME2_REPLY_HANDLER_TIMEOUT_MS")); + } catch (NumberFormatException nfe) { + try { + long dme2EpReadTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS")); + long dme2EpConnTimeoutMs = Long.parseLong(props.getProperty("AFT_DME2_EP_CONN_TIMEOUT")); + dme2ReplyHandlerTimeoutMs = timeoutMs + dme2EpReadTimeoutMs + dme2EpConnTimeoutMs; + getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default from timeoutMs, AFT_DME2_EP_READ_TIMEOUT_MS and AFT_DME2_EP_CONN_TIMEOUT " + dme2ReplyHandlerTimeoutMs); + }catch (NumberFormatException e) { + //backward compatibility + dme2ReplyHandlerTimeoutMs = timeoutMs + defaultDme2ReplyHandlerTimeoutMs; + getLog().debug("DME2_REPLY_HANDLER_TIMEOUT_MS not set and using default " + dme2ReplyHandlerTimeoutMs); + } + } + //backward compatibility + if ( dme2ReplyHandlerTimeoutMs <= 0) { + dme2ReplyHandlerTimeoutMs = timeoutMs + defaultDme2ReplyHandlerTimeoutMs; + } + sender = new DME2Client(new URI(url),dme2PerEndPointTimeoutMs); sender.setAllowAllHttpReturnCodes(true); sender.setMethod(methodType); sender.setSubContext(subContextPath); @@ -469,7 +508,7 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { sender.setHeaders(DMETimeOuts); sender.setPayload(""); - if (handlers.equalsIgnoreCase("yes")) { + if (handlers!=null&&handlers.equalsIgnoreCase("yes")) { sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS")); @@ -594,7 +633,8 @@ public class MRConsumerImpl extends MRBaseClient implements MRConsumer { if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) { DMEConfigure(timeoutMs, limit); - String reply = sender.sendAndWait(timeoutMs + 10000L); + long timeout = (dme2ReplyHandlerTimeoutMs > 0 && longPollingMs == timeoutMs) ? dme2ReplyHandlerTimeoutMs : (timeoutMs+ defaultDme2ReplyHandlerTimeoutMs); + String reply = sender.sendAndWait(timeout); final JSONObject o = getResponseDataInJsonWithResponseReturned(reply); diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java index 2f7680b..5bb5087 100644 --- a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java +++ b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java @@ -67,6 +67,14 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP fUrls = baseUrls; return this; } + + public Builder againstUrlsOrServiceName ( Collection<String> baseUrls, Collection<String> serviceName, String transportype ) + { + fUrls = baseUrls; + fServiceName = serviceName; + fTransportype = transportype; + return this; + } public Builder onTopic(String topic) { fTopic = topic; @@ -119,6 +127,8 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP } private Collection<String> fUrls; + private Collection<String> fServiceName; + private String fTransportype; private String fTopic; private int fMaxBatchSize = 100; private long fMaxBatchAgeMs = 1000; @@ -278,7 +288,9 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP final long nowMs = Clock.now(); - host = this.fHostSelector.selectBaseHost(); + if (this.fHostSelector != null) { + host = this.fHostSelector.selectBaseHost(); + } final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"), props.getProperty("partition")); @@ -796,7 +808,7 @@ public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingP sender.setSubContext(subContextPath); sender.setCredentials(dmeuser, dmepassword); sender.setHeaders(DMETimeOuts); - if (handlers.equalsIgnoreCase("yes")) { + if (handlers != null &&handlers.equalsIgnoreCase("yes")) { sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS")); sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", diff --git a/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java b/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java new file mode 100644 index 0000000..900c932 --- /dev/null +++ b/src/main/java/com/att/nsa/mr/tools/ValidatorUtil.java @@ -0,0 +1,176 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ +package com.att.nsa.mr.tools; + +import java.util.Properties; + +import com.att.nsa.mr.test.clients.ProtocolTypeConstants; + +public class ValidatorUtil { + + public static void validatePublisher(Properties props) { + String transportType = props.getProperty("TransportType"); + if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(transportType)) { + validateForDME2(props); + } else { + validateForNonDME2(props); + } + String maxBatchSize = props.getProperty("maxBatchSize"); + if (maxBatchSize == null || maxBatchSize.isEmpty()) { + throw new IllegalArgumentException ( "maxBatchSize is needed" ); + } + String maxAgeMs = props.getProperty("maxAgeMs"); + if (maxAgeMs == null || maxAgeMs.isEmpty()) { + throw new IllegalArgumentException ( "maxAgeMs is needed" ); + } + String messageSentThreadOccurance = props.getProperty("MessageSentThreadOccurance"); + if (messageSentThreadOccurance == null || messageSentThreadOccurance.isEmpty()) { + throw new IllegalArgumentException ( "MessageSentThreadOccurance is needed" ); + } + + } + + public static void validateSubscriber(Properties props) { + String transportType = props.getProperty("TransportType"); + if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(transportType)) { + validateForDME2(props); + } else { + validateForNonDME2(props); + } + String group = props.getProperty("group"); + if (group == null || group.isEmpty()) { + throw new IllegalArgumentException ( "group is needed" ); + } + String id = props.getProperty("id"); + if (id == null || id.isEmpty()) { + throw new IllegalArgumentException ( "Consumer (Id) is needed" ); + } + String sessionstickinessrequired = props.getProperty("sessionstickinessrequired"); + if (sessionstickinessrequired == null || sessionstickinessrequired.isEmpty()) { + throw new IllegalArgumentException ( "sessionstickinessrequired is needed" ); + } + } + + private static void validateForDME2(Properties props) { + String serviceName = props.getProperty("ServiceName"); + if (serviceName == null || serviceName.isEmpty()) { + throw new IllegalArgumentException ( "Servicename is needed" ); + } + String topic = props.getProperty("topic"); + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException ( "topic is needed" ); + } + String username = props.getProperty("username"); + if (username == null || username.isEmpty()) { + throw new IllegalArgumentException ( "username is needed" ); + } + String password = props.getProperty("password"); + if (password == null || password.isEmpty()) { + throw new IllegalArgumentException ( "password is needed" ); + } + String dME2preferredRouterFilePath = props.getProperty("DME2preferredRouterFilePath"); + if (dME2preferredRouterFilePath == null || dME2preferredRouterFilePath.isEmpty()) { + throw new IllegalArgumentException ( "DME2preferredRouterFilePath is needed" ); + } + String partner = props.getProperty("Partner"); + String routeOffer = props.getProperty("routeOffer"); + if ((partner == null || partner.isEmpty()) && (routeOffer == null || routeOffer.isEmpty())) { + throw new IllegalArgumentException ( "Partner or routeOffer is needed" ); + } + String protocol = props.getProperty("Protocol"); + if (protocol == null || protocol.isEmpty()) { + throw new IllegalArgumentException ( "Protocol is needed" ); + } + String methodType = props.getProperty("MethodType"); + if (methodType == null || methodType.isEmpty()) { + throw new IllegalArgumentException ( "MethodType is needed" ); + } + String contenttype = props.getProperty("contenttype"); + if (contenttype == null || contenttype.isEmpty()) { + throw new IllegalArgumentException ( "contenttype is needed" ); + } + String latitude = props.getProperty("Latitude"); + if (latitude == null || latitude.isEmpty()) { + throw new IllegalArgumentException ( "Latitude is needed" ); + } + String longitude = props.getProperty("Longitude"); + if (longitude == null || longitude.isEmpty()) { + throw new IllegalArgumentException ( "Longitude is needed" ); + } + String aftEnv = props.getProperty("AFT_ENVIRONMENT"); + if (aftEnv == null || aftEnv.isEmpty()) { + throw new IllegalArgumentException ( "AFT_ENVIRONMENT is needed" ); + } + String version = props.getProperty("Version"); + if (version == null || version.isEmpty()) { + throw new IllegalArgumentException ( "Version is needed" ); + } + String environment = props.getProperty("Environment"); + if (environment == null || environment.isEmpty()) { + throw new IllegalArgumentException ( "Environment is needed" ); + } + String subContextPath = props.getProperty("SubContextPath"); + if (subContextPath == null || subContextPath.isEmpty()) { + throw new IllegalArgumentException ( "SubContextPath is needed" ); + } + } + + private static void validateForNonDME2(Properties props) { + String transportType = props.getProperty("TransportType"); + String host = props.getProperty("host"); + if (host == null || host.isEmpty()) { + throw new IllegalArgumentException ( "Servicename is needed" ); + } + String topic = props.getProperty("topic"); + if (topic == null || topic.isEmpty()) { + throw new IllegalArgumentException ( "topic is needed" ); + } + String methodType = props.getProperty("MethodType"); + if (methodType == null || methodType.isEmpty()) { + throw new IllegalArgumentException ( "MethodType is needed" ); + } + String contenttype = props.getProperty("contenttype"); + if (contenttype == null || contenttype.isEmpty()) { + throw new IllegalArgumentException ( "contenttype is needed" ); + } + String username = props.getProperty("username"); + if (username == null || username.isEmpty()) { + throw new IllegalArgumentException ( "username is needed" ); + } + String password = props.getProperty("password"); + if (password == null || password.isEmpty()) { + throw new IllegalArgumentException ( "password is needed" ); + } + if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(transportType)) { + String authKey = props.getProperty("authKey"); + if (authKey == null || authKey.isEmpty()) { + throw new IllegalArgumentException ( "authKey is needed" ); + } + String authDate = props.getProperty("authDate"); + if (authDate == null || authDate.isEmpty()) { + throw new IllegalArgumentException ( "password is needed" ); + } + + } + } + +} |