aboutsummaryrefslogtreecommitdiffstats
path: root/sdnr/wt/mountpoint-registrar/provider/src/main
diff options
context:
space:
mode:
authorhighstreetherbert <herbert.eiselt@highstreet-technologies.com>2020-07-10 18:55:57 +0200
committerhighstreetherbert <herbert.eiselt@highstreet-technologies.com>2020-07-11 20:24:17 +0200
commit5f5644378ff7510a6c7c1190e93e997abc8cbc25 (patch)
treeb55f22f5ca5422d939f073cee832300df5ec7610 /sdnr/wt/mountpoint-registrar/provider/src/main
parent23c27ddcd79913d11eac16eb42c5a43899de97a1 (diff)
Reformat sdnr mountpoint-registrar to ONAP code style
Reformat to ONAP code style Issue-ID: SDNC-1281 Signed-off-by: highstreetherbert <herbert.eiselt@highstreet-technologies.com> Change-Id: I284542212bc2b7c505bf939239f291526f2ae8ac Signed-off-by: highstreetherbert <herbert.eiselt@highstreet-technologies.com>
Diffstat (limited to 'sdnr/wt/mountpoint-registrar/provider/src/main')
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java148
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java177
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java18
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java258
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java294
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java281
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java142
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java105
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java180
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java209
-rw-r--r--sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java228
11 files changed, 1055 insertions, 985 deletions
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java
index 14d31d578..4e6cc996d 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPFaultVESMsgConsumer.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
@@ -30,75 +30,77 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.pro
public class DMaaPFaultVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPFaultVESMsgConsumer.class);
-
- //private static int faultCounter = 0;
- private static final String DEFAULT_SDNRUSER = "admin";
- private static final String DEFAULT_SDNRPASSWD = "admin";
-
- @Override
- public void processMsg(String msg) throws Exception {
- String faultNodeId;
- String faultOccurrenceTime;
- String faultObjectId;
- String faultReason;
- String faultSeverity;
- int faultSequence;
- ObjectMapper oMapper = new ObjectMapper();
- JsonNode dmaapMessageRootNode;
-
- LOG.info("Fault VES Message is - {}", msg);
- try {
- dmaapMessageRootNode = oMapper.readTree(msg);
- faultNodeId = dmaapMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
- faultOccurrenceTime = dmaapMessageRootNode.at("/event/faultFields/alarmAdditionalInformation/eventTime").textValue();
- faultObjectId = dmaapMessageRootNode.at("/event/faultFields/alarmInterfaceA").textValue();
- faultReason = dmaapMessageRootNode.at("/event/faultFields/specificProblem").textValue();
- faultSeverity = dmaapMessageRootNode.at("/event/faultFields/eventSeverity").textValue();
- faultSequence = dmaapMessageRootNode.at("/event/commonEventHeader/sequence").intValue();
- //faultCounter++;
-
- if (faultSeverity.equalsIgnoreCase("critical")) {
- faultSeverity = SeverityType.Critical.toString();
- } else if (faultSeverity.equalsIgnoreCase("major")) {
- faultSeverity = SeverityType.Major.toString();
- } else if (faultSeverity.equalsIgnoreCase("minor")) {
- faultSeverity = SeverityType.Minor.toString();
- } else if (faultSeverity.equalsIgnoreCase("warning")) {
- faultSeverity = SeverityType.Warning.toString();
- } else if (faultSeverity.equalsIgnoreCase("nonalarmed")) {
- faultSeverity = SeverityType.NonAlarmed.toString();
- } else {
- faultSeverity = SeverityType.NonAlarmed.toString();
- }
-
- String baseUrl = getBaseUrl();
- String sdnrUser = getSDNRUser();
- String sdnrPasswd = getSDNRPasswd();
-
- FaultNotificationClient faultClient = getFaultNotificationClient(baseUrl);
- faultClient.setAuthorization(sdnrUser, sdnrPasswd);
- faultClient.sendFaultNotification(faultNodeId, Integer.toString(faultSequence), faultOccurrenceTime, faultObjectId, faultReason, faultSeverity);
-
- } catch (IOException e) {
- LOG.info("Cannot parse json object ");
- throw new Exception("Cannot parse json object", e);
- }
- }
-
- public String getBaseUrl() {
- return GeneralConfig.getBaseUrl();
- }
-
- public String getSDNRUser() {
- return GeneralConfig.getSDNRUser()!= null?GeneralConfig.getSDNRUser():DEFAULT_SDNRUSER;
- }
-
- public String getSDNRPasswd() {
- return GeneralConfig.getSDNRPasswd()!= null?GeneralConfig.getSDNRPasswd():DEFAULT_SDNRPASSWD;
- }
-
- public FaultNotificationClient getFaultNotificationClient(String baseUrl) {
- return new FaultNotificationClient(baseUrl);
- }
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPFaultVESMsgConsumer.class);
+
+ //private static int faultCounter = 0;
+ private static final String DEFAULT_SDNRUSER = "admin";
+ private static final String DEFAULT_SDNRPASSWD = "admin";
+
+ @Override
+ public void processMsg(String msg) throws Exception {
+ String faultNodeId;
+ String faultOccurrenceTime;
+ String faultObjectId;
+ String faultReason;
+ String faultSeverity;
+ int faultSequence;
+ ObjectMapper oMapper = new ObjectMapper();
+ JsonNode dmaapMessageRootNode;
+
+ LOG.info("Fault VES Message is - {}", msg);
+ try {
+ dmaapMessageRootNode = oMapper.readTree(msg);
+ faultNodeId = dmaapMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
+ faultOccurrenceTime =
+ dmaapMessageRootNode.at("/event/faultFields/alarmAdditionalInformation/eventTime").textValue();
+ faultObjectId = dmaapMessageRootNode.at("/event/faultFields/alarmInterfaceA").textValue();
+ faultReason = dmaapMessageRootNode.at("/event/faultFields/specificProblem").textValue();
+ faultSeverity = dmaapMessageRootNode.at("/event/faultFields/eventSeverity").textValue();
+ faultSequence = dmaapMessageRootNode.at("/event/commonEventHeader/sequence").intValue();
+ //faultCounter++;
+
+ if (faultSeverity.equalsIgnoreCase("critical")) {
+ faultSeverity = SeverityType.Critical.toString();
+ } else if (faultSeverity.equalsIgnoreCase("major")) {
+ faultSeverity = SeverityType.Major.toString();
+ } else if (faultSeverity.equalsIgnoreCase("minor")) {
+ faultSeverity = SeverityType.Minor.toString();
+ } else if (faultSeverity.equalsIgnoreCase("warning")) {
+ faultSeverity = SeverityType.Warning.toString();
+ } else if (faultSeverity.equalsIgnoreCase("nonalarmed")) {
+ faultSeverity = SeverityType.NonAlarmed.toString();
+ } else {
+ faultSeverity = SeverityType.NonAlarmed.toString();
+ }
+
+ String baseUrl = getBaseUrl();
+ String sdnrUser = getSDNRUser();
+ String sdnrPasswd = getSDNRPasswd();
+
+ FaultNotificationClient faultClient = getFaultNotificationClient(baseUrl);
+ faultClient.setAuthorization(sdnrUser, sdnrPasswd);
+ faultClient.sendFaultNotification(faultNodeId, Integer.toString(faultSequence), faultOccurrenceTime,
+ faultObjectId, faultReason, faultSeverity);
+
+ } catch (IOException e) {
+ LOG.info("Cannot parse json object ");
+ throw new Exception("Cannot parse json object", e);
+ }
+ }
+
+ public String getBaseUrl() {
+ return GeneralConfig.getBaseUrl();
+ }
+
+ public String getSDNRUser() {
+ return GeneralConfig.getSDNRUser() != null ? GeneralConfig.getSDNRUser() : DEFAULT_SDNRUSER;
+ }
+
+ public String getSDNRPasswd() {
+ return GeneralConfig.getSDNRPasswd() != null ? GeneralConfig.getSDNRPasswd() : DEFAULT_SDNRPASSWD;
+ }
+
+ public FaultNotificationClient getFaultNotificationClient(String baseUrl) {
+ return new FaultNotificationClient(baseUrl);
+ }
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java
index 00d7e9514..ed9f583a5 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPPNFRegVESMsgConsumer.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt mountpoint-registrar
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
@@ -28,90 +28,95 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class DMaaPPNFRegVESMsgConsumer extends DMaaPVESMsgConsumerImpl {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPPNFRegVESMsgConsumer.class);
- private static final String DEFAULT_PROTOCOL = "SSH";
- private static final String DEFAULT_PORT = "17830";
- private static final String DEFAULT_USERNAME = "netconf";
- private static final String DEFAULT_PASSWORD = "netconf";
- private static final String DEFAULT_SDNRUSER = "admin";
- private static final String DEFAULT_SDNRPASSWD = "admin";
-
- @Override
- public void processMsg(String msg) {
- LOG.debug("Message from DMaaP topic is - {} ",msg);
- String pnfId;
- String pnfIPv4Address;
- String pnfCommProtocol;
- String pnfCommPort;
- String pnfKeyId = null;
- String pnfUsername;
- String pnfPasswd = null;
- ObjectMapper oMapper = new ObjectMapper();
- JsonNode dmaapMessageRootNode;
- try {
- dmaapMessageRootNode = oMapper.readTree(msg);
- pnfId = dmaapMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
- pnfIPv4Address = dmaapMessageRootNode.at("/event/pnfRegistrationFields/oamV4IpAddress").textValue();
- pnfCommProtocol = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/protocol").textValue();
- pnfCommPort = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/oamPort").textValue();
- if (pnfCommProtocol != null) {
- if (pnfCommProtocol.equalsIgnoreCase("TLS")) {
- // Read username and keyId
- pnfKeyId = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/keyId").textValue();
- pnfUsername = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username").textValue();
- } else if (pnfCommProtocol.equalsIgnoreCase("SSH")) {
- // Read username and password
- pnfUsername = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username").textValue();
- pnfPasswd = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/password").textValue();
- } else {
- // log warning - Unknown protocol
- LOG.warn("Only SSH and TLS protocols supported. Protocol specified in VES message is - {}",pnfCommProtocol,". Defaulting to SSH");
- pnfCommProtocol = DEFAULT_PROTOCOL;
- pnfCommPort = DEFAULT_PORT;
- pnfUsername = DEFAULT_USERNAME;
- pnfPasswd = DEFAULT_PASSWORD;
- }
- } else {
- LOG.warn("Protocol not specified in VES message, Defaulting to SSH");
- pnfCommProtocol = DEFAULT_PROTOCOL;
- pnfCommPort = DEFAULT_PORT;
- pnfUsername = DEFAULT_USERNAME;
- pnfPasswd = DEFAULT_PASSWORD;
- }
-
- LOG.debug("PNF Fields - {} : {} : {} : {} : {} : {} : {}",pnfId,pnfIPv4Address,pnfCommProtocol,pnfKeyId,pnfUsername,pnfPasswd,pnfCommPort);
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPPNFRegVESMsgConsumer.class);
+ private static final String DEFAULT_PROTOCOL = "SSH";
+ private static final String DEFAULT_PORT = "17830";
+ private static final String DEFAULT_USERNAME = "netconf";
+ private static final String DEFAULT_PASSWORD = "netconf";
+ private static final String DEFAULT_SDNRUSER = "admin";
+ private static final String DEFAULT_SDNRPASSWD = "admin";
- String baseUrl = getBaseUrl();
- String sdnrUser = getSDNRUser();
- String sdnrPasswd = getSDNRPasswd();
+ @Override
+ public void processMsg(String msg) {
+ LOG.debug("Message from DMaaP topic is - {} ", msg);
+ String pnfId;
+ String pnfIPv4Address;
+ String pnfCommProtocol;
+ String pnfCommPort;
+ String pnfKeyId = null;
+ String pnfUsername;
+ String pnfPasswd = null;
+ ObjectMapper oMapper = new ObjectMapper();
+ JsonNode dmaapMessageRootNode;
+ try {
+ dmaapMessageRootNode = oMapper.readTree(msg);
+ pnfId = dmaapMessageRootNode.at("/event/commonEventHeader/sourceName").textValue();
+ pnfIPv4Address = dmaapMessageRootNode.at("/event/pnfRegistrationFields/oamV4IpAddress").textValue();
+ pnfCommProtocol =
+ dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/protocol").textValue();
+ pnfCommPort = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/oamPort").textValue();
+ if (pnfCommProtocol != null) {
+ if (pnfCommProtocol.equalsIgnoreCase("TLS")) {
+ // Read username and keyId
+ pnfKeyId =
+ dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/keyId").textValue();
+ pnfUsername = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username")
+ .textValue();
+ } else if (pnfCommProtocol.equalsIgnoreCase("SSH")) {
+ // Read username and password
+ pnfUsername = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/username")
+ .textValue();
+ pnfPasswd = dmaapMessageRootNode.at("/event/pnfRegistrationFields/additionalFields/password")
+ .textValue();
+ } else {
+ // log warning - Unknown protocol
+ LOG.warn("Only SSH and TLS protocols supported. Protocol specified in VES message is - {}",
+ pnfCommProtocol, ". Defaulting to SSH");
+ pnfCommProtocol = DEFAULT_PROTOCOL;
+ pnfCommPort = DEFAULT_PORT;
+ pnfUsername = DEFAULT_USERNAME;
+ pnfPasswd = DEFAULT_PASSWORD;
+ }
+ } else {
+ LOG.warn("Protocol not specified in VES message, Defaulting to SSH");
+ pnfCommProtocol = DEFAULT_PROTOCOL;
+ pnfCommPort = DEFAULT_PORT;
+ pnfUsername = DEFAULT_USERNAME;
+ pnfPasswd = DEFAULT_PASSWORD;
+ }
- PNFMountPointClient mountpointClient = getPNFMountPointClient(baseUrl);
- LOG.debug("Setting RESTConf Authorization values - {} : {}",sdnrUser,sdnrPasswd);
- mountpointClient.setAuthorization(sdnrUser, sdnrPasswd);
-
- mountpointClient.pnfMountPointCreate(pnfId,
- pnfIPv4Address,
- pnfCommProtocol, pnfKeyId,
- pnfUsername, pnfPasswd,
- pnfCommPort);
- } catch (IOException e) {
- LOG.info("Cannot parse json object, ignoring the received PNF Registration VES Message. Reason: {}",e.getMessage());
- }
- }
-
- public String getBaseUrl() {
- return GeneralConfig.getBaseUrl();
- }
-
- public String getSDNRUser() {
- return GeneralConfig.getSDNRUser()!= null?GeneralConfig.getSDNRUser():DEFAULT_SDNRUSER;
- }
-
- public String getSDNRPasswd() {
- return GeneralConfig.getSDNRPasswd()!= null?GeneralConfig.getSDNRPasswd():DEFAULT_SDNRPASSWD;
- }
-
- public PNFMountPointClient getPNFMountPointClient(String baseUrl) {
- return new PNFMountPointClient(baseUrl);
- }
+ LOG.debug("PNF Fields - {} : {} : {} : {} : {} : {} : {}", pnfId, pnfIPv4Address, pnfCommProtocol, pnfKeyId,
+ pnfUsername, pnfPasswd, pnfCommPort);
+
+ String baseUrl = getBaseUrl();
+ String sdnrUser = getSDNRUser();
+ String sdnrPasswd = getSDNRPasswd();
+
+ PNFMountPointClient mountpointClient = getPNFMountPointClient(baseUrl);
+ LOG.debug("Setting RESTConf Authorization values - {} : {}", sdnrUser, sdnrPasswd);
+ mountpointClient.setAuthorization(sdnrUser, sdnrPasswd);
+
+ mountpointClient.pnfMountPointCreate(pnfId, pnfIPv4Address, pnfCommProtocol, pnfKeyId, pnfUsername,
+ pnfPasswd, pnfCommPort);
+ } catch (IOException e) {
+ LOG.info("Cannot parse json object, ignoring the received PNF Registration VES Message. Reason: {}",
+ e.getMessage());
+ }
+ }
+
+ public String getBaseUrl() {
+ return GeneralConfig.getBaseUrl();
+ }
+
+ public String getSDNRUser() {
+ return GeneralConfig.getSDNRUser() != null ? GeneralConfig.getSDNRUser() : DEFAULT_SDNRUSER;
+ }
+
+ public String getSDNRPasswd() {
+ return GeneralConfig.getSDNRPasswd() != null ? GeneralConfig.getSDNRPasswd() : DEFAULT_SDNRPASSWD;
+ }
+
+ public PNFMountPointClient getPNFMountPointClient(String baseUrl) {
+ return new PNFMountPointClient(baseUrl);
+ }
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java
index 8474bbb0b..25f737583 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumer.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt mountpoint-registrar
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
@@ -22,15 +22,15 @@ import java.util.Properties;
public abstract interface DMaaPVESMsgConsumer extends Runnable {
- //public abstract void init(Properties baseProperties, String consumerPropertiesPath);
- public abstract void init(Properties baseProperties);
+ //public abstract void init(Properties baseProperties, String consumerPropertiesPath);
+ public abstract void init(Properties baseProperties);
- public abstract void processMsg(String msg) throws Exception;//Implement something like InvalidMessageException;
+ public abstract void processMsg(String msg) throws Exception;//Implement something like InvalidMessageException;
- public abstract boolean isReady();
+ public abstract boolean isReady();
- public abstract boolean isRunning();
-
- public abstract void stopConsumer();
+ public abstract boolean isRunning();
+
+ public abstract void stopConsumer();
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
index 1164a9a88..66cce840f 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerImpl.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt mountpoint-registrar
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
@@ -29,131 +29,133 @@ import org.onap.dmaap.mr.client.response.MRConsumerResponse;
public abstract class DMaaPVESMsgConsumerImpl implements DMaaPVESMsgConsumer {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class);
-
- private final String name = this.getClass().getSimpleName();
- private Properties properties = null;
- private MRConsumer consumer = null;
- private MRConsumerResponse consumerResponse = null;
- private boolean running = false;
- private boolean ready = false;
- private int fetchPause = 5000; // Default pause between fetch - 5 seconds
- private int timeout = 15000; // Default timeout - 15 seconds
-
- protected DMaaPVESMsgConsumerImpl() {
-
- }
-
- /*
- * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns.
- * If no data arrives on the topic, sleeps for a certain time period before checking again
- */
- @Override
- public void run() {
-
- if (ready) {
- running = true;
- while (running) {
- try {
- boolean noData = true;
- consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
- for (String msg : consumerResponse.getActualMessages()) {
- noData = false;
- LOG.debug(name + " received ActualMessage from DMaaP VES Message topic:\n"+msg);
- processMsg(msg);
- }
-
- if (noData) {
- LOG.debug(name + " received ResponseCode: " + consumerResponse.getResponseCode());
- LOG.debug(name + " received ResponseMessage: " + consumerResponse.getResponseMessage());
- if ((consumerResponse.getResponseCode() == null) && (consumerResponse.getResponseMessage().contains("SocketTimeoutException"))) {
- LOG.warn("Client timeout while waiting for response from Server {}", consumerResponse.getResponseMessage());
- }
- pauseThread();
- }
- } catch (Exception e) {
- LOG.error("Caught exception reading from DMaaP VES Message Topic", e);
- running = false;
- }
- }
- }
- }
-
- /*
- * Create a consumer by specifying properties containing information such as topic name, timeout, URL etc
- */
- @Override
- public void init(Properties properties) {
-
- try {
-
- String timeoutStr = properties.getProperty("timeout");
- LOG.debug("timeoutStr: " + timeoutStr);
-
- if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
- timeout = parseTimeOutValue(timeoutStr);
- }
-
- String fetchPauseStr = properties.getProperty("fetchPause");
- LOG.debug("fetchPause(Str): " + fetchPauseStr);
- if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
- fetchPause = parseFetchPause(fetchPauseStr);
- }
- LOG.debug("fetchPause: " + fetchPause);
-
- this.consumer = MRClientFactory.createConsumer(properties);
- ready = true;
- } catch (Exception e) {
- LOG.error("Error initializing DMaaP VES Message consumer from file " + properties, e);
- }
- }
-
- private int parseTimeOutValue(String timeoutStr) {
- try {
- return Integer.parseInt(timeoutStr);
- } catch (NumberFormatException e) {
- LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
- }
- return timeout;
- }
-
- private int parseFetchPause(String fetchPauseStr) {
- try {
- return Integer.parseInt(fetchPauseStr);
- } catch (NumberFormatException e) {
- LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
- }
- return fetchPause;
- }
-
- private void pauseThread() throws InterruptedException {
- if (fetchPause > 0) {
- LOG.debug(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause));
- Thread.sleep(fetchPause);
- } else {
- LOG.debug("No data received from fetch. No fetch pause specified - retrying immediately");
- }
- }
-
- @Override
- public boolean isReady() {
- return ready;
- }
-
- @Override
- public boolean isRunning() {
- return running;
- }
-
- public String getProperty(String name) {
- return properties.getProperty(name, "");
- }
-
- @Override
- public void stopConsumer() {
- running = false;
- }
-
- public abstract void processMsg(String msg) throws Exception;
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerImpl.class);
+
+ private final String name = this.getClass().getSimpleName();
+ private Properties properties = null;
+ private MRConsumer consumer = null;
+ private MRConsumerResponse consumerResponse = null;
+ private boolean running = false;
+ private boolean ready = false;
+ private int fetchPause = 5000; // Default pause between fetch - 5 seconds
+ private int timeout = 15000; // Default timeout - 15 seconds
+
+ protected DMaaPVESMsgConsumerImpl() {
+
+ }
+
+ /*
+ * Thread to fetch messages from the DMaaP topic. Waits for the messages to arrive on the topic until a certain timeout and returns.
+ * If no data arrives on the topic, sleeps for a certain time period before checking again
+ */
+ @Override
+ public void run() {
+
+ if (ready) {
+ running = true;
+ while (running) {
+ try {
+ boolean noData = true;
+ consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
+ for (String msg : consumerResponse.getActualMessages()) {
+ noData = false;
+ LOG.debug(name + " received ActualMessage from DMaaP VES Message topic:\n" + msg);
+ processMsg(msg);
+ }
+
+ if (noData) {
+ LOG.debug(name + " received ResponseCode: " + consumerResponse.getResponseCode());
+ LOG.debug(name + " received ResponseMessage: " + consumerResponse.getResponseMessage());
+ if ((consumerResponse.getResponseCode() == null)
+ && (consumerResponse.getResponseMessage().contains("SocketTimeoutException"))) {
+ LOG.warn("Client timeout while waiting for response from Server {}",
+ consumerResponse.getResponseMessage());
+ }
+ pauseThread();
+ }
+ } catch (Exception e) {
+ LOG.error("Caught exception reading from DMaaP VES Message Topic", e);
+ running = false;
+ }
+ }
+ }
+ }
+
+ /*
+ * Create a consumer by specifying properties containing information such as topic name, timeout, URL etc
+ */
+ @Override
+ public void init(Properties properties) {
+
+ try {
+
+ String timeoutStr = properties.getProperty("timeout");
+ LOG.debug("timeoutStr: " + timeoutStr);
+
+ if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
+ timeout = parseTimeOutValue(timeoutStr);
+ }
+
+ String fetchPauseStr = properties.getProperty("fetchPause");
+ LOG.debug("fetchPause(Str): " + fetchPauseStr);
+ if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
+ fetchPause = parseFetchPause(fetchPauseStr);
+ }
+ LOG.debug("fetchPause: " + fetchPause);
+
+ this.consumer = MRClientFactory.createConsumer(properties);
+ ready = true;
+ } catch (Exception e) {
+ LOG.error("Error initializing DMaaP VES Message consumer from file " + properties, e);
+ }
+ }
+
+ private int parseTimeOutValue(String timeoutStr) {
+ try {
+ return Integer.parseInt(timeoutStr);
+ } catch (NumberFormatException e) {
+ LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
+ }
+ return timeout;
+ }
+
+ private int parseFetchPause(String fetchPauseStr) {
+ try {
+ return Integer.parseInt(fetchPauseStr);
+ } catch (NumberFormatException e) {
+ LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
+ }
+ return fetchPause;
+ }
+
+ private void pauseThread() throws InterruptedException {
+ if (fetchPause > 0) {
+ LOG.debug(String.format("No data received from fetch. Pausing %d ms before retry", fetchPause));
+ Thread.sleep(fetchPause);
+ } else {
+ LOG.debug("No data received from fetch. No fetch pause specified - retrying immediately");
+ }
+ }
+
+ @Override
+ public boolean isReady() {
+ return ready;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running;
+ }
+
+ public String getProperty(String name) {
+ return properties.getProperty(name, "");
+ }
+
+ @Override
+ public void stopConsumer() {
+ running = false;
+ }
+
+ public abstract void processMsg(String msg) throws Exception;
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
index 26afd19aa..7b851aa5c 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/DMaaPVESMsgConsumerMain.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt mountpoint-registrar
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
@@ -29,143 +29,157 @@ import org.slf4j.LoggerFactory;
public class DMaaPVESMsgConsumerMain implements Runnable {
- private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
-
- boolean threadsRunning = false;
- static List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
- public GeneralConfig config;
- public PNFRegistrationConfig pnfRegistrationConfig;
- public FaultConfig faultConfig;
-
- public DMaaPVESMsgConsumerMain(Map<String, Configuration> configMap) {
- configMap.forEach((k, v) -> initialize(k, v));
- }
-
- public void initialize(String domain, Configuration domainConfig) {
- LOG.debug("In initialize method : Domain = {} and domainConfig = {}",domain,domainConfig);
- String consumerClass = null;
- Properties consumerProperties = new Properties();
- if (domain.equalsIgnoreCase("pnfregistration")) {
- this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
-
- consumerClass = pnfRegistrationConfig.getConsumerClass();
- LOG.debug("Consumer class = "+consumerClass);
-
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, pnfRegistrationConfig.getTransportType());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, pnfRegistrationConfig.getHostPort());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, pnfRegistrationConfig.getContenttype());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP, pnfRegistrationConfig.getConsumerGroup());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID, pnfRegistrationConfig.getConsumerId());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, pnfRegistrationConfig.getTimeout());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, pnfRegistrationConfig.getFetchPause());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, pnfRegistrationConfig.getProtocol());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME, pnfRegistrationConfig.getUsername());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD, pnfRegistrationConfig.getPassword());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, pnfRegistrationConfig.getClientReadTimeout());
- consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, pnfRegistrationConfig.getClientConnectTimeout());
- } else if (domain.equalsIgnoreCase("fault")) {
- this.faultConfig = (FaultConfig) domainConfig;
- consumerClass = faultConfig.getConsumerClass();
- LOG.debug("Consumer class = {}",consumerClass);
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, faultConfig.getClientReadTimeout());
- consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, faultConfig.getClientConnectTimeout());
- }
-
- if (consumerClass != null) {
- LOG.info("Calling createConsumer : {}",consumerClass);
- threadsRunning = createConsumer(consumerClass, consumerProperties);
- }
-
- }
-
- private static boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
- boolean threadsRunning = false;
- for (DMaaPVESMsgConsumer consumer : consumers) {
- if (consumer.isRunning()) {
- threadsRunning = true;
- }
- }
- return threadsRunning;
- }
-
- static boolean createConsumer(String consumerClassName, Properties properties) {
- Class<?> consumerClass = null;
-
- try {
- consumerClass = Class.forName(consumerClassName);
- } catch (Exception e) {
- LOG.error("Could not find DMaap VES Message consumer class {}", consumerClassName, e);
- }
-
- if (consumerClass != null) {
- LOG.debug("Calling handleConsumerClass");
- handleConsumerClass(consumerClass, consumerClassName, properties, consumers);
- }
- return !consumers.isEmpty();
- }
-
- private static boolean handleConsumerClass(Class<?> consumerClass, String consumerClassName, Properties properties, List<DMaaPVESMsgConsumer> consumers) {
- DMaaPVESMsgConsumer consumer = null;
-
- try {
- consumer = (DMaaPVESMsgConsumer) consumerClass.newInstance();
- LOG.debug("Successfully created an instance of consumerClass : {}",consumerClassName);
- } catch (Exception e) {
- LOG.error("Could not create consumer from class {}",consumerClassName, e);
- }
-
- if (consumer != null) {
- LOG.info("Initializing consumer {}({})", consumerClassName, properties);
- consumer.init(properties);
-
- if (consumer.isReady()) {
- Thread consumerThread = new Thread(consumer);
- consumerThread.start();
- consumers.add(consumer);
-
- LOG.info("Started consumer thread ({} : {})", consumerClassName,
- properties);
- return true;
- } else {
- LOG.debug("Consumer {} is not ready", consumerClassName);
- }
- }
- return false;
- }
-
- public void run() {
- while (threadsRunning) {
- threadsRunning = updateThreadState(consumers);
- if (!threadsRunning) {
- break;
- }
-
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- LOG.error(e.getLocalizedMessage(), e);
- }
- }
-
- LOG.info("No listener threads running - exiting");
- }
-
- public static List<DMaaPVESMsgConsumer> getConsumers() {
- return consumers;
- }
+ private static final Logger LOG = LoggerFactory.getLogger(DMaaPVESMsgConsumerMain.class);
+
+ boolean threadsRunning = false;
+ static List<DMaaPVESMsgConsumer> consumers = new LinkedList<>();
+ public GeneralConfig config;
+ public PNFRegistrationConfig pnfRegistrationConfig;
+ public FaultConfig faultConfig;
+
+ public DMaaPVESMsgConsumerMain(Map<String, Configuration> configMap) {
+ configMap.forEach((k, v) -> initialize(k, v));
+ }
+
+ public void initialize(String domain, Configuration domainConfig) {
+ LOG.debug("In initialize method : Domain = {} and domainConfig = {}", domain, domainConfig);
+ String consumerClass = null;
+ Properties consumerProperties = new Properties();
+ if (domain.equalsIgnoreCase("pnfregistration")) {
+ this.pnfRegistrationConfig = (PNFRegistrationConfig) domainConfig;
+
+ consumerClass = pnfRegistrationConfig.getConsumerClass();
+ LOG.debug("Consumer class = " + consumerClass);
+
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
+ pnfRegistrationConfig.getTransportType());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_HOST_PORT,
+ pnfRegistrationConfig.getHostPort());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE,
+ pnfRegistrationConfig.getContenttype());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_GROUP,
+ pnfRegistrationConfig.getConsumerGroup());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_ID,
+ pnfRegistrationConfig.getConsumerId());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TOPIC, pnfRegistrationConfig.getTopic());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_TIMEOUT,
+ pnfRegistrationConfig.getTimeout());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_LIMIT, pnfRegistrationConfig.getLimit());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE,
+ pnfRegistrationConfig.getFetchPause());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PROTOCOL,
+ pnfRegistrationConfig.getProtocol());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_USERNAME,
+ pnfRegistrationConfig.getUsername());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_PASSWORD,
+ pnfRegistrationConfig.getPassword());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
+ pnfRegistrationConfig.getClientReadTimeout());
+ consumerProperties.put(PNFRegistrationConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
+ pnfRegistrationConfig.getClientConnectTimeout());
+ } else if (domain.equalsIgnoreCase("fault")) {
+ this.faultConfig = (FaultConfig) domainConfig;
+ consumerClass = faultConfig.getConsumerClass();
+ LOG.debug("Consumer class = {}", consumerClass);
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, faultConfig.getTransportType());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_HOST_PORT, faultConfig.getHostPort());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CONTENTTYPE, faultConfig.getContenttype());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_GROUP, faultConfig.getConsumerGroup());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_ID, faultConfig.getConsumerId());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TOPIC, faultConfig.getTopic());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_TIMEOUT, faultConfig.getTimeout());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_LIMIT, faultConfig.getLimit());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_FETCHPAUSE, faultConfig.getFetchPause());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PROTOCOL, faultConfig.getProtocol());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_USERNAME, faultConfig.getUsername());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_PASSWORD, faultConfig.getPassword());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
+ faultConfig.getClientReadTimeout());
+ consumerProperties.put(FaultConfig.PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
+ faultConfig.getClientConnectTimeout());
+ }
+
+ if (consumerClass != null) {
+ LOG.info("Calling createConsumer : {}", consumerClass);
+ threadsRunning = createConsumer(consumerClass, consumerProperties);
+ }
+
+ }
+
+ private static boolean updateThreadState(List<DMaaPVESMsgConsumer> consumers) {
+ boolean threadsRunning = false;
+ for (DMaaPVESMsgConsumer consumer : consumers) {
+ if (consumer.isRunning()) {
+ threadsRunning = true;
+ }
+ }
+ return threadsRunning;
+ }
+
+ static boolean createConsumer(String consumerClassName, Properties properties) {
+ Class<?> consumerClass = null;
+
+ try {
+ consumerClass = Class.forName(consumerClassName);
+ } catch (Exception e) {
+ LOG.error("Could not find DMaap VES Message consumer class {}", consumerClassName, e);
+ }
+
+ if (consumerClass != null) {
+ LOG.debug("Calling handleConsumerClass");
+ handleConsumerClass(consumerClass, consumerClassName, properties, consumers);
+ }
+ return !consumers.isEmpty();
+ }
+
+ private static boolean handleConsumerClass(Class<?> consumerClass, String consumerClassName, Properties properties,
+ List<DMaaPVESMsgConsumer> consumers) {
+ DMaaPVESMsgConsumer consumer = null;
+
+ try {
+ consumer = (DMaaPVESMsgConsumer) consumerClass.newInstance();
+ LOG.debug("Successfully created an instance of consumerClass : {}", consumerClassName);
+ } catch (Exception e) {
+ LOG.error("Could not create consumer from class {}", consumerClassName, e);
+ }
+
+ if (consumer != null) {
+ LOG.info("Initializing consumer {}({})", consumerClassName, properties);
+ consumer.init(properties);
+
+ if (consumer.isReady()) {
+ Thread consumerThread = new Thread(consumer);
+ consumerThread.start();
+ consumers.add(consumer);
+
+ LOG.info("Started consumer thread ({} : {})", consumerClassName, properties);
+ return true;
+ } else {
+ LOG.debug("Consumer {} is not ready", consumerClassName);
+ }
+ }
+ return false;
+ }
+
+ public void run() {
+ while (threadsRunning) {
+ threadsRunning = updateThreadState(consumers);
+ if (!threadsRunning) {
+ break;
+ }
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ LOG.error(e.getLocalizedMessage(), e);
+ }
+ }
+
+ LOG.info("No listener threads running - exiting");
+ }
+
+ public static List<DMaaPVESMsgConsumer> getConsumers() {
+ return consumers;
+ }
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java
index dc0ed9cdd..a6323f270 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultConfig.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt
* =================================================================================================
@@ -14,153 +14,168 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
public class FaultConfig implements Configuration {
- private static final String SECTION_MARKER = "fault";
+ private static final String SECTION_MARKER = "fault";
- private static final String PROPERTY_KEY_CONSUMER_CLASS = "faultConsumerClass";
- private static final String DEFAULT_VALUE_CONSUMER_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
+ private static final String PROPERTY_KEY_CONSUMER_CLASS = "faultConsumerClass";
+ private static final String DEFAULT_VALUE_CONSUMER_CLASS =
+ "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPFaultVESMsgConsumer";
+
+ public static final String PROPERTY_KEY_CONSUMER_TRANSPORTTYPE = "TransportType";
+ private static final String DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE = "HTTPNOAUTH";
- public static final String PROPERTY_KEY_CONSUMER_TRANSPORTTYPE = "TransportType";
- private static final String DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE = "HTTPNOAUTH";
-
public static final String PROPERTY_KEY_CONSUMER_PROTOCOL = "Protocol";
private static final String DEFAULT_VALUE_CONSUMER_PROTOCOL = "http";
-
+
public static final String PROPERTY_KEY_CONSUMER_USERNAME = "username";
private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "username";
-
+
public static final String PROPERTY_KEY_CONSUMER_PASSWORD = "password";
private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "password";
- public static final String PROPERTY_KEY_CONSUMER_HOST_PORT = "host";
- private static final String DEFAULT_VALUE_CONSUMER_HOST_PORT = "onap-dmaap:3904";
-
- public static final String PROPERTY_KEY_CONSUMER_TOPIC = "topic";
- private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_FAULT_OUTPUT";
-
- public static final String PROPERTY_KEY_CONSUMER_CONTENTTYPE = "contenttype";
- private static final String DEFAULT_VALUE_CONSUMER_CONTENTTYPE = "application/json";
-
- public static final String PROPERTY_KEY_CONSUMER_GROUP = "group";
- private static final String DEFAULT_VALUE_CONSUMER_GROUP = "myG";
-
- public static final String PROPERTY_KEY_CONSUMER_ID = "id";
- private static final String DEFAULT_VALUE_CONSUMER_ID = "C1";
-
- public static final String PROPERTY_KEY_CONSUMER_TIMEOUT = "timeout";
- private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "20000";
-
- public static final String PROPERTY_KEY_CONSUMER_LIMIT = "limit";
- private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "10000";
-
- public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause";
- private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT="jersey.config.client.readTimeout";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT="25000";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT="jersey.config.client.connectTimeout";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT="25000";
-
- private final ConfigurationFileRepresentation configuration;
-
- public FaultConfig(ConfigurationFileRepresentation configuration) {
- this.configuration = configuration;
- this.configuration.addSection(SECTION_MARKER);
- defaults();
- }
-
- @Override
- public String getSectionName() {
- return SECTION_MARKER;
- }
-
- @Override
- public void defaults() {
-
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLASS, DEFAULT_VALUE_CONSUMER_CLASS);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PROTOCOL, DEFAULT_VALUE_CONSUMER_PROTOCOL);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME, DEFAULT_VALUE_CONSUMER_USERNAME);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD, DEFAULT_VALUE_CONSUMER_PASSWORD);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_HOST_PORT, DEFAULT_VALUE_CONSUMER_HOST_PORT);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC, DEFAULT_VALUE_CONSUMER_TOPIC);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CONTENTTYPE, DEFAULT_VALUE_CONSUMER_CONTENTTYPE);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_GROUP, DEFAULT_VALUE_CONSUMER_GROUP);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_ID, DEFAULT_VALUE_CONSUMER_ID);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TIMEOUT, DEFAULT_VALUE_CONSUMER_TIMEOUT);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_LIMIT, DEFAULT_VALUE_CONSUMER_LIMIT);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_FETCHPAUSE, DEFAULT_VALUE_CONSUMER_FETCHPAUSE);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT);
-
- }
-
- public String getConsumerClass() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLASS);
- }
-
- public String getHostPort() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_HOST_PORT);
- }
-
- public String getTransportType() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE);
- }
-
- public String getProtocol() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PROTOCOL);
- }
-
- public String getUsername() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME);
- }
-
- public String getPassword() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD);
- }
-
- public String getTopic() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC);
- }
-
- public String getConsumerGroup() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_GROUP);
- }
-
- public String getConsumerId() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_ID);
- }
-
- public String getTimeout() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TIMEOUT);
- }
-
- public String getLimit() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_LIMIT);
- }
-
- public String getFetchPause() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_FETCHPAUSE);
- }
-
- public String getContenttype() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CONTENTTYPE);
- }
-
- public String getClientReadTimeout() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT);
- }
-
- public String getClientConnectTimeout() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT);
- }
+ public static final String PROPERTY_KEY_CONSUMER_HOST_PORT = "host";
+ private static final String DEFAULT_VALUE_CONSUMER_HOST_PORT = "onap-dmaap:3904";
+
+ public static final String PROPERTY_KEY_CONSUMER_TOPIC = "topic";
+ private static final String DEFAULT_VALUE_CONSUMER_TOPIC = "unauthenticated.SEC_FAULT_OUTPUT";
+
+ public static final String PROPERTY_KEY_CONSUMER_CONTENTTYPE = "contenttype";
+ private static final String DEFAULT_VALUE_CONSUMER_CONTENTTYPE = "application/json";
+
+ public static final String PROPERTY_KEY_CONSUMER_GROUP = "group";
+ private static final String DEFAULT_VALUE_CONSUMER_GROUP = "myG";
+
+ public static final String PROPERTY_KEY_CONSUMER_ID = "id";
+ private static final String DEFAULT_VALUE_CONSUMER_ID = "C1";
+
+ public static final String PROPERTY_KEY_CONSUMER_TIMEOUT = "timeout";
+ private static final String DEFAULT_VALUE_CONSUMER_TIMEOUT = "20000";
+
+ public static final String PROPERTY_KEY_CONSUMER_LIMIT = "limit";
+ private static final String DEFAULT_VALUE_CONSUMER_LIMIT = "10000";
+
+ public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause";
+ private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000";
+
+ public static final String PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT = "jersey.config.client.readTimeout";
+ private static final String DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT = "25000";
+
+ public static final String PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT = "jersey.config.client.connectTimeout";
+ private static final String DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT = "25000";
+
+ private final ConfigurationFileRepresentation configuration;
+
+ public FaultConfig(ConfigurationFileRepresentation configuration) {
+ this.configuration = configuration;
+ this.configuration.addSection(SECTION_MARKER);
+ defaults();
+ }
+
+ @Override
+ public String getSectionName() {
+ return SECTION_MARKER;
+ }
+
+ @Override
+ public void defaults() {
+
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLASS,
+ DEFAULT_VALUE_CONSUMER_CLASS);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
+ DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PROTOCOL,
+ DEFAULT_VALUE_CONSUMER_PROTOCOL);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME,
+ DEFAULT_VALUE_CONSUMER_USERNAME);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD,
+ DEFAULT_VALUE_CONSUMER_PASSWORD);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_HOST_PORT,
+ DEFAULT_VALUE_CONSUMER_HOST_PORT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC,
+ DEFAULT_VALUE_CONSUMER_TOPIC);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CONTENTTYPE,
+ DEFAULT_VALUE_CONSUMER_CONTENTTYPE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_GROUP,
+ DEFAULT_VALUE_CONSUMER_GROUP);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_ID, DEFAULT_VALUE_CONSUMER_ID);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TIMEOUT,
+ DEFAULT_VALUE_CONSUMER_TIMEOUT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_LIMIT,
+ DEFAULT_VALUE_CONSUMER_LIMIT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_FETCHPAUSE,
+ DEFAULT_VALUE_CONSUMER_FETCHPAUSE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
+ DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
+ DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT);
+
+ }
+
+ public String getConsumerClass() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLASS);
+ }
+
+ public String getHostPort() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_HOST_PORT);
+ }
+
+ public String getTransportType() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE);
+ }
+
+ public String getProtocol() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PROTOCOL);
+ }
+
+ public String getUsername() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME);
+ }
+
+ public String getPassword() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD);
+ }
+
+ public String getTopic() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC);
+ }
+
+ public String getConsumerGroup() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_GROUP);
+ }
+
+ public String getConsumerId() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_ID);
+ }
+
+ public String getTimeout() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TIMEOUT);
+ }
+
+ public String getLimit() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_LIMIT);
+ }
+
+ public String getFetchPause() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_FETCHPAUSE);
+ }
+
+ public String getContenttype() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CONTENTTYPE);
+ }
+
+ public String getClientReadTimeout() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT);
+ }
+
+ public String getClientConnectTimeout() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT);
+ }
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java
index 76137674a..3fdbb6f20 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/FaultNotificationClient.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
-
import org.onap.ccsdk.features.sdnr.wt.common.http.BaseHTTPClient;
import org.onap.ccsdk.features.sdnr.wt.common.http.BaseHTTPResponse;
import org.slf4j.Logger;
@@ -30,70 +29,75 @@ import org.slf4j.LoggerFactory;
public class FaultNotificationClient extends BaseHTTPClient {
- private static final Logger LOG = LoggerFactory.getLogger(FaultNotificationClient.class);
- private static final String FAULT_NOTIFICATION_URI = "restconf/operations/devicemanager:push-fault-notification";
- private final Map<String, String> headerMap;
-
- private static final String FAULT_PAYLOAD = "{\n" +
- " \"devicemanager:input\": {\n" +
- " \"devicemanager:node-id\": \"@node-id@\",\n" +
- " \"devicemanager:counter\": \"@counter@\",\n" +
- " \"devicemanager:timestamp\": \"@timestamp@\",\n" +
- " \"devicemanager:object-id\": \"@object-id@\",\n" +
- " \"devicemanager:problem\": \"@problem@\",\n" +
- " \"devicemanager:severity\": \"@severity@\"\n" +
- " }\n" +
- "}";
-
- public FaultNotificationClient(String baseUrl) {
- super(baseUrl);
-
- this.headerMap = new HashMap<>();
- this.headerMap.put("Content-Type", "application/json");
- this.headerMap.put("Accept", "application/json");
- }
-
- public void setAuthorization(String username, String password) {
- String credentials = username + ":" + password;
- this.headerMap.put("Authorization",
- "Basic " + new String(Base64.getEncoder().encode(credentials.getBytes())));
-
- }
-
- public boolean sendFaultNotification(String faultNodeId, String faultCounter, String faultOccurrenceTime, String faultObjectId,
- String faultReason, String faultSeverity) {
- String message = "";
-
- message = updateFaultPayload(faultNodeId, faultCounter, faultOccurrenceTime, faultObjectId, faultReason, faultSeverity);
-
- LOG.debug("Payload after updating values is: "+message);
-
- return sendFaultRequest("POST", message) == 200;
-
- }
-
- private static String updateFaultPayload(String faultNodeId, String faultCounter, String faultOccurrenceTime, String faultObjectId,
- String faultReason, String faultSeverity) {
- return FAULT_PAYLOAD.replace("@node-id@", faultNodeId)
- .replace("@counter@", faultCounter)
- .replace("@timestamp@", faultOccurrenceTime)
- .replace("@object-id@", faultObjectId)
- .replace("@problem@", faultReason)
- .replace("@severity@", faultSeverity);
- }
-
-
- private int sendFaultRequest(String method, String message) {
- LOG.debug("In sendFaultRequest - "+method+ " "+message);
- BaseHTTPResponse response;
- try {
- String uri = FAULT_NOTIFICATION_URI;
- response = this.sendRequest(uri, method, message, headerMap);
- LOG.debug("finished with responsecode {}", response.code);
- return response.code;
- } catch (IOException e) {
- LOG.warn("problem sending fault message {} : {}", e.getMessage());
- return -1;
- }
- }
+ private static final Logger LOG = LoggerFactory.getLogger(FaultNotificationClient.class);
+ private static final String FAULT_NOTIFICATION_URI = "restconf/operations/devicemanager:push-fault-notification";
+ private final Map<String, String> headerMap;
+
+ // @formatter:off
+ private static final String FAULT_PAYLOAD = "{\n"
+ + " \"devicemanager:input\": {\n"
+ + " \"devicemanager:node-id\": \"@node-id@\",\n"
+ + " \"devicemanager:counter\": \"@counter@\",\n"
+ + " \"devicemanager:timestamp\": \"@timestamp@\",\n"
+ + " \"devicemanager:object-id\": \"@object-id@\",\n"
+ + " \"devicemanager:problem\": \"@problem@\",\n"
+ + " \"devicemanager:severity\": \"@severity@\"\n"
+ + " }\n"
+ + "}";
+ // @formatter:on
+
+
+ public FaultNotificationClient(String baseUrl) {
+ super(baseUrl);
+
+ this.headerMap = new HashMap<>();
+ this.headerMap.put("Content-Type", "application/json");
+ this.headerMap.put("Accept", "application/json");
+ }
+
+ public void setAuthorization(String username, String password) {
+ String credentials = username + ":" + password;
+ this.headerMap.put("Authorization", "Basic " + new String(Base64.getEncoder().encode(credentials.getBytes())));
+
+ }
+
+ public boolean sendFaultNotification(String faultNodeId, String faultCounter, String faultOccurrenceTime,
+ String faultObjectId, String faultReason, String faultSeverity) {
+ String message = "";
+
+ message = updateFaultPayload(faultNodeId, faultCounter, faultOccurrenceTime, faultObjectId, faultReason,
+ faultSeverity);
+
+ LOG.debug("Payload after updating values is: " + message);
+
+ return sendFaultRequest("POST", message) == 200;
+
+ }
+
+ private static String updateFaultPayload(String faultNodeId, String faultCounter, String faultOccurrenceTime,
+ String faultObjectId, String faultReason, String faultSeverity) {
+ // @formatter:off
+ return FAULT_PAYLOAD.replace("@node-id@", faultNodeId)
+ .replace("@counter@", faultCounter)
+ .replace("@timestamp@", faultOccurrenceTime)
+ .replace("@object-id@", faultObjectId)
+ .replace("@problem@", faultReason)
+ .replace("@severity@", faultSeverity);
+ // @formatter:on
+ }
+
+
+ private int sendFaultRequest(String method, String message) {
+ LOG.debug("In sendFaultRequest - " + method + " " + message);
+ BaseHTTPResponse response;
+ try {
+ String uri = FAULT_NOTIFICATION_URI;
+ response = this.sendRequest(uri, method, message, headerMap);
+ LOG.debug("finished with responsecode {}", response.code);
+ return response.code;
+ } catch (IOException e) {
+ LOG.warn("problem sending fault message {} : {}", e.getMessage());
+ return -1;
+ }
+ }
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java
index 39d688ba2..e8269ccda 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/GeneralConfig.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
@@ -22,66 +22,67 @@ import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRep
/**
* Configuration of mountpoint-registrar, general section<br>
- * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not
- * Generates default Configuration properties if none exist or exist partially
- * Generates Consumer properties only for TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not
- * generated by default. For a list of applicable properties for the different TranportType values, please see - https://wiki.onap.org/display/DW/Feature+configuration+requirements
+ * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not Generates default
+ * Configuration properties if none exist or exist partially Generates Consumer properties only for
+ * TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not
+ * generated by default. For a list of applicable properties for the different TranportType values, please see -
+ * https://wiki.onap.org/display/DW/Feature+configuration+requirements
*/
public class GeneralConfig implements Configuration {
private static final String SECTION_MARKER = "general";
- private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled" ; //"enabled";
-
+ private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled"; //"enabled";
+
private static final String PROPERTY_KEY_USER = "sdnrUser";
private static final String DEFAULT_VALUE_USER = "admin";
-
+
private static final String PROPERTY_KEY_USERPASSWD = "sdnrPasswd";
private static final String DEFAULT_VALUE_USERPASSWD = "admin";
-
+
private static final String PROPERTY_KEY_BASEURL = "baseUrl";
private static final String DEFAULT_VALUE_BASEURL = "http://localhost:8181";
-
-
- private static ConfigurationFileRepresentation configuration;
-
- public GeneralConfig(ConfigurationFileRepresentation configuration) {
- GeneralConfig.configuration = configuration;
- GeneralConfig.configuration.addSection(SECTION_MARKER);
- defaults();
- }
-
- public Boolean getEnabled() {
- Boolean enabled = configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED);
- return enabled;
- }
-
- public static String getBaseUrl() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_BASEURL);
- }
-
- public static String getSDNRUser() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_USER);
- }
-
- public static String getSDNRPasswd() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_USERPASSWD);
- }
-
- @Override
- public String getSectionName() {
- return SECTION_MARKER;
- }
-
- @Override
- public void defaults() {
- // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_BASEURL, DEFAULT_VALUE_BASEURL);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_USER, DEFAULT_VALUE_USER);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_USERPASSWD, DEFAULT_VALUE_USERPASSWD);
- }
-
-
+
+
+ private static ConfigurationFileRepresentation configuration;
+
+ public GeneralConfig(ConfigurationFileRepresentation configuration) {
+ GeneralConfig.configuration = configuration;
+ GeneralConfig.configuration.addSection(SECTION_MARKER);
+ defaults();
+ }
+
+ public Boolean getEnabled() {
+ Boolean enabled = configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED);
+ return enabled;
+ }
+
+ public static String getBaseUrl() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_BASEURL);
+ }
+
+ public static String getSDNRUser() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_USER);
+ }
+
+ public static String getSDNRPasswd() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_USERPASSWD);
+ }
+
+ @Override
+ public String getSectionName() {
+ return SECTION_MARKER;
+ }
+
+ @Override
+ public void defaults() {
+ // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_BASEURL, DEFAULT_VALUE_BASEURL);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_USER, DEFAULT_VALUE_USER);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_USERPASSWD, DEFAULT_VALUE_USERPASSWD);
+ }
+
+
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
index a993f23c9..12a992216 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
@@ -31,93 +31,95 @@ import org.slf4j.LoggerFactory;
public class MountpointRegistrarImpl implements AutoCloseable, IConfigChangedListener {
- private static final Logger LOG = LoggerFactory.getLogger(MountpointRegistrarImpl.class);
- private static final String APPLICATION_NAME = "mountpoint-registrar";
- private static final String CONFIGURATIONFILE = "etc/mountpoint-registrar.properties";
-
- private Thread dmaapVESMsgConsumerMain = null;
-
- private GeneralConfig generalConfig;
- private boolean dmaapEnabled = false;
- private Map<String, Configuration> configMap = new HashMap<>();
-
- // Blueprint 1
- public MountpointRegistrarImpl() {
- LOG.info("Creating provider class for {}", APPLICATION_NAME);
- }
-
- public void init() {
- LOG.info("Init call for {}", APPLICATION_NAME);
-
- ConfigurationFileRepresentation configFileRepresentation = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
- configFileRepresentation.registerConfigChangedListener(this);
-
- generalConfig = new GeneralConfig(configFileRepresentation);
- PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation);
- FaultConfig faultConfig = new FaultConfig(configFileRepresentation);
-
- configMap.put("pnfRegistration", pnfRegConfig);
- configMap.put("fault", faultConfig);
-
- dmaapEnabled = generalConfig.getEnabled();
- if (dmaapEnabled) { // start dmaap consumer thread only if dmaapEnabled=true
- LOG.info("DMaaP seems to be enabled, starting consumer(s)");
- dmaapVESMsgConsumerMain = new Thread(new DMaaPVESMsgConsumerMain(configMap));
- dmaapVESMsgConsumerMain.start();
- } else {
- LOG.info("DMaaP seems to be disabled, not starting any consumer(s)");
- }
- }
-
- /**
- * Reflect status for Unit Tests
- * @return Text with status
- */
- public String isInitializationOk() {
- return "No implemented";
- }
-
- @Override
- public void onConfigChanged() {
- LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled());
- boolean dmaapEnabledNewVal = generalConfig.getEnabled();
- if (!dmaapEnabled && dmaapEnabledNewVal) { // Dmaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
- LOG.info("DMaaP is enabled, starting consumer(s)");
- dmaapVESMsgConsumerMain = new Thread(new DMaaPVESMsgConsumerMain(configMap));
- dmaapVESMsgConsumerMain.start();
- } else if (dmaapEnabled && !dmaapEnabledNewVal) { // Dmaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
- LOG.info("DMaaP is disabled, stopping consumer(s)");
- List<DMaaPVESMsgConsumer> consumers = DMaaPVESMsgConsumerMain.getConsumers();
- for (DMaaPVESMsgConsumer consumer : consumers) {
- // stop all consumers
- consumer.stopConsumer();
- }
- }
- dmaapEnabled = dmaapEnabledNewVal;
- }
-
- @Override
- public void close() throws Exception {
- LOG.info("{} closing ...", this.getClass().getName());
- //close(updateService, configService, mwtnService); issue#1
- //close(updateService, mwtnService);
- LOG.info("{} closing done",APPLICATION_NAME);
- }
-
- /**
- * Used to close all Services, that should support AutoCloseable Pattern
- *
- * @param toClose
- * @throws Exception
- */
- @SuppressWarnings("unused")
- private void close(AutoCloseable... toCloseList) throws Exception {
- for (AutoCloseable element : toCloseList) {
- if (element != null) {
- element.close();
- }
- }
- }
+ private static final Logger LOG = LoggerFactory.getLogger(MountpointRegistrarImpl.class);
+ private static final String APPLICATION_NAME = "mountpoint-registrar";
+ private static final String CONFIGURATIONFILE = "etc/mountpoint-registrar.properties";
+
+ private Thread dmaapVESMsgConsumerMain = null;
+
+ private GeneralConfig generalConfig;
+ private boolean dmaapEnabled = false;
+ private Map<String, Configuration> configMap = new HashMap<>();
+
+ // Blueprint 1
+ public MountpointRegistrarImpl() {
+ LOG.info("Creating provider class for {}", APPLICATION_NAME);
+ }
+
+ public void init() {
+ LOG.info("Init call for {}", APPLICATION_NAME);
+
+ ConfigurationFileRepresentation configFileRepresentation =
+ new ConfigurationFileRepresentation(CONFIGURATIONFILE);
+ configFileRepresentation.registerConfigChangedListener(this);
+
+ generalConfig = new GeneralConfig(configFileRepresentation);
+ PNFRegistrationConfig pnfRegConfig = new PNFRegistrationConfig(configFileRepresentation);
+ FaultConfig faultConfig = new FaultConfig(configFileRepresentation);
+
+ configMap.put("pnfRegistration", pnfRegConfig);
+ configMap.put("fault", faultConfig);
+
+ dmaapEnabled = generalConfig.getEnabled();
+ if (dmaapEnabled) { // start dmaap consumer thread only if dmaapEnabled=true
+ LOG.info("DMaaP seems to be enabled, starting consumer(s)");
+ dmaapVESMsgConsumerMain = new Thread(new DMaaPVESMsgConsumerMain(configMap));
+ dmaapVESMsgConsumerMain.start();
+ } else {
+ LOG.info("DMaaP seems to be disabled, not starting any consumer(s)");
+ }
+ }
+
+ /**
+ * Reflect status for Unit Tests
+ *
+ * @return Text with status
+ */
+ public String isInitializationOk() {
+ return "No implemented";
+ }
+
+ @Override
+ public void onConfigChanged() {
+ LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled());
+ boolean dmaapEnabledNewVal = generalConfig.getEnabled();
+ if (!dmaapEnabled && dmaapEnabledNewVal) { // Dmaap disabled earlier (or during bundle startup) but enabled later, start Consumer(s)
+ LOG.info("DMaaP is enabled, starting consumer(s)");
+ dmaapVESMsgConsumerMain = new Thread(new DMaaPVESMsgConsumerMain(configMap));
+ dmaapVESMsgConsumerMain.start();
+ } else if (dmaapEnabled && !dmaapEnabledNewVal) { // Dmaap enabled earlier (or during bundle startup) but disabled later, stop consumer(s)
+ LOG.info("DMaaP is disabled, stopping consumer(s)");
+ List<DMaaPVESMsgConsumer> consumers = DMaaPVESMsgConsumerMain.getConsumers();
+ for (DMaaPVESMsgConsumer consumer : consumers) {
+ // stop all consumers
+ consumer.stopConsumer();
+ }
+ }
+ dmaapEnabled = dmaapEnabledNewVal;
+ }
+
+ @Override
+ public void close() throws Exception {
+ LOG.info("{} closing ...", this.getClass().getName());
+ //close(updateService, configService, mwtnService); issue#1
+ //close(updateService, mwtnService);
+ LOG.info("{} closing done", APPLICATION_NAME);
+ }
+
+ /**
+ * Used to close all Services, that should support AutoCloseable Pattern
+ *
+ * @param toClose
+ * @throws Exception
+ */
+ @SuppressWarnings("unused")
+ private void close(AutoCloseable... toCloseList) throws Exception {
+ for (AutoCloseable element : toCloseList) {
+ if (element != null) {
+ element.close();
+ }
+ }
+ }
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java
index ace1c0a39..f8653ca75 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFMountPointClient.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
@@ -22,112 +22,121 @@ import java.io.IOException;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
-
import org.onap.ccsdk.features.sdnr.wt.common.http.BaseHTTPClient;
import org.onap.ccsdk.features.sdnr.wt.common.http.BaseHTTPResponse;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PNFMountPointClient extends BaseHTTPClient {
- private static final Logger LOG = LoggerFactory.getLogger(PNFMountPointClient.class);
- private static final String MOUNTPOINT_URI = "restconf/config/network-topology:network-topology/topology/topology-netconf/node/";
- private final Map<String, String> headerMap;
- private static final String SSH_PAYLOAD = "<node xmlns=\"urn:TBD:params:xml:ns:yang:network-topology\">\n" +
- " <node-id>@device-name@</node-id>\n" +
- " <host xmlns=\"urn:opendaylight:netconf-node-topology\">@device-ip@</host>\n" +
- " <port xmlns=\"urn:opendaylight:netconf-node-topology\">@device-port@</port>\n" +
- " <username xmlns=\"urn:opendaylight:netconf-node-topology\">@username@</username>\n" +
- " <password xmlns=\"urn:opendaylight:netconf-node-topology\">@password@</password>\n" +
- " <tcp-only xmlns=\"urn:opendaylight:netconf-node-topology\">false</tcp-only>\n" +
- " <!-- non-mandatory fields with default values, you can safely remove these if you do not wish to override any of these values-->\n" +
- " <reconnect-on-changed-schema xmlns=\"urn:opendaylight:netconf-node-topology\">false</reconnect-on-changed-schema>\n" +
- " <connection-timeout-millis xmlns=\"urn:opendaylight:netconf-node-topology\">20000</connection-timeout-millis>\n" +
- " <max-connection-attempts xmlns=\"urn:opendaylight:netconf-node-topology\">0</max-connection-attempts>\n" +
- " <between-attempts-timeout-millis xmlns=\"urn:opendaylight:netconf-node-topology\">2000</between-attempts-timeout-millis>\n" +
- " <sleep-factor xmlns=\"urn:opendaylight:netconf-node-topology\">1.5</sleep-factor>\n" +
- " <!-- keepalive-delay set to 0 turns off keepalives-->\n" +
- " <keepalive-delay xmlns=\"urn:opendaylight:netconf-node-topology\">120</keepalive-delay>\n" +
- "</node>";
- private static final String TLS_PAYLOAD = "<node xmlns=\"urn:TBD:params:xml:ns:yang:network-topology\">\n" +
- " <node-id>@device-name@</node-id>\n" +
- " <host xmlns=\"urn:opendaylight:netconf-node-topology\">@device-ip@</host>\n" +
- " <port xmlns=\"urn:opendaylight:netconf-node-topology\">@device-port@</port>\n" +
- " <key-based xmlns=\"urn:opendaylight:netconf-node-topology\">\n" +
- " <username xmlns=\"urn:opendaylight:netconf-node-topology\">@username@</username>\n" +
- " <key-id>@key-id@</key-id>\n" +
- " </key-based>\n" +
- " <tcp-only xmlns=\"urn:opendaylight:netconf-node-topology\">false</tcp-only>\n" +
- " <protocol xmlns=\"urn:opendaylight:netconf-node-topology\">\n" +
- " <name>TLS</name>\n" +
- " </protocol>\n" +
- " <!-- non-mandatory fields with default values, you can safely remove these if you do not wish to override any of these values-->\n" +
- " <reconnect-on-changed-schema xmlns=\"urn:opendaylight:netconf-node-topology\">false</reconnect-on-changed-schema>\n" +
- " <connection-timeout-millis xmlns=\"urn:opendaylight:netconf-node-topology\">20000</connection-timeout-millis>\n" +
- " <max-connection-attempts xmlns=\"urn:opendaylight:netconf-node-topology\">0</max-connection-attempts>\n" +
- " <between-attempts-timeout-millis xmlns=\"urn:opendaylight:netconf-node-topology\">2000</between-attempts-timeout-millis>\n" +
- " <sleep-factor xmlns=\"urn:opendaylight:netconf-node-topology\">1.5</sleep-factor>\n" +
- " <!-- keepalive-delay set to 0 turns off keepalives-->\n" +
- " <keepalive-delay xmlns=\"urn:opendaylight:netconf-node-topology\">120</keepalive-delay>\n" +
- "</node>";
-
- public PNFMountPointClient(String baseUrl) {
- super(baseUrl);
-
- this.headerMap = new HashMap<>();
+ private static final Logger LOG = LoggerFactory.getLogger(PNFMountPointClient.class);
+ private static final String MOUNTPOINT_URI =
+ "restconf/config/network-topology:network-topology/topology/topology-netconf/node/";
+ private final Map<String, String> headerMap;
+ // @formatter:off
+ private static final String SSH_PAYLOAD = "<node xmlns=\"urn:TBD:params:xml:ns:yang:network-topology\">\n"
+ + " <node-id>@device-name@</node-id>\n"
+ + " <host xmlns=\"urn:opendaylight:netconf-node-topology\">@device-ip@</host>\n"
+ + " <port xmlns=\"urn:opendaylight:netconf-node-topology\">@device-port@</port>\n"
+ + " <username xmlns=\"urn:opendaylight:netconf-node-topology\">@username@</username>\n"
+ + " <password xmlns=\"urn:opendaylight:netconf-node-topology\">@password@</password>\n"
+ + " <tcp-only xmlns=\"urn:opendaylight:netconf-node-topology\">false</tcp-only>\n"
+ + " <!-- non-mandatory fields with default values, you can safely remove these if you do not wish to override any of these values-->\n"
+ + " <reconnect-on-changed-schema xmlns=\"urn:opendaylight:netconf-node-topology\">false</reconnect-on-changed-schema>\n"
+ + " <connection-timeout-millis xmlns=\"urn:opendaylight:netconf-node-topology\">20000</connection-timeout-millis>\n"
+ + " <max-connection-attempts xmlns=\"urn:opendaylight:netconf-node-topology\">0</max-connection-attempts>\n"
+ + " <between-attempts-timeout-millis xmlns=\"urn:opendaylight:netconf-node-topology\">2000</between-attempts-timeout-millis>\n"
+ + " <sleep-factor xmlns=\"urn:opendaylight:netconf-node-topology\">1.5</sleep-factor>\n"
+ + " <!-- keepalive-delay set to 0 turns off keepalives-->\n"
+ + " <keepalive-delay xmlns=\"urn:opendaylight:netconf-node-topology\">120</keepalive-delay>\n"
+ + "</node>";
+ // @formatter:on
+ // @formatter:off
+ private static final String TLS_PAYLOAD = "<node xmlns=\"urn:TBD:params:xml:ns:yang:network-topology\">\n"
+ + " <node-id>@device-name@</node-id>\n"
+ + " <host xmlns=\"urn:opendaylight:netconf-node-topology\">@device-ip@</host>\n"
+ + " <port xmlns=\"urn:opendaylight:netconf-node-topology\">@device-port@</port>\n"
+ + " <key-based xmlns=\"urn:opendaylight:netconf-node-topology\">\n"
+ + " <username xmlns=\"urn:opendaylight:netconf-node-topology\">@username@</username>\n"
+ + " <key-id>@key-id@</key-id>\n"
+ + " </key-based>\n"
+ + " <tcp-only xmlns=\"urn:opendaylight:netconf-node-topology\">false</tcp-only>\n"
+ + " <protocol xmlns=\"urn:opendaylight:netconf-node-topology\">\n"
+ + " <name>TLS</name>\n"
+ + " </protocol>\n"
+ + " <!-- non-mandatory fields with default values, you can safely remove these if you do not wish to override any of these values-->\n"
+ + " <reconnect-on-changed-schema xmlns=\"urn:opendaylight:netconf-node-topology\">false</reconnect-on-changed-schema>\n"
+ + " <connection-timeout-millis xmlns=\"urn:opendaylight:netconf-node-topology\">20000</connection-timeout-millis>\n"
+ + " <max-connection-attempts xmlns=\"urn:opendaylight:netconf-node-topology\">0</max-connection-attempts>\n"
+ + " <between-attempts-timeout-millis xmlns=\"urn:opendaylight:netconf-node-topology\">2000</between-attempts-timeout-millis>\n"
+ + " <sleep-factor xmlns=\"urn:opendaylight:netconf-node-topology\">1.5</sleep-factor>\n"
+ + " <!-- keepalive-delay set to 0 turns off keepalives-->\n"
+ + " <keepalive-delay xmlns=\"urn:opendaylight:netconf-node-topology\">120</keepalive-delay>\n"
+ + "</node>";
+ // @formatter:on
+
+ public PNFMountPointClient(String baseUrl) {
+ super(baseUrl);
+
+ this.headerMap = new HashMap<>();
this.headerMap.put("Content-Type", "application/xml");
this.headerMap.put("Accept", "application/xml");
- }
-
- public void setAuthorization(String username, String password) {
- String credentials = username + ":" + password;
- this.headerMap.put("Authorization",
- "Basic " + new String(Base64.getEncoder().encode(credentials.getBytes())));
-
- }
-
- public boolean pnfMountPointCreate(String pnfName, String ipv4Address, String protocol, String keyId, String username, String password, String commPort) {
- String message = "";
- if (protocol.equals("TLS")) {
- message = updateTLSPayload(pnfName, ipv4Address, username, keyId, commPort);
- } else { //SSH
- message = updatePayload(pnfName, ipv4Address, username, password, commPort);
- LOG.debug("Payload after updating values is: {}",message);
- }
- return pnfRequest(pnfName, "PUT", message) == 200;
-
- }
-
- private static String updatePayload(String pnfName, String ipv4Address, String username, String password, String portNo) {
- return SSH_PAYLOAD.replace("@device-name@", pnfName)
- .replace("@device-ip@", ipv4Address)
- .replace("@device-port@", portNo)
- .replace("@username@", username)
- .replace("@password@", password);
- }
-
- private static String updateTLSPayload(String pnfName, String ipv4Address, String username, String keyId, String portNo) {
- return TLS_PAYLOAD.replace("@device-name@", pnfName)
- .replace("@device-ip@", ipv4Address)
- .replace("@username@", username)
- .replace("@key-id@", keyId)
- .replace("@device-port@", portNo);
- }
-
- private int pnfRequest(String pnfName, String method, String message) {
- LOG.info("In pnfRequest - {} : {} : {}",pnfName,method,message);
- BaseHTTPResponse response;
- try {
- //String uri = "http://localhost:8181/restconf/config/network-topology:network-topology/topology/topology-netconf/node/" + pnfName;
- String uri = MOUNTPOINT_URI + pnfName;
- response = this.sendRequest(uri, method, message, headerMap);
- LOG.debug("finished with responsecode {}", response.code);
- return response.code;
- } catch (IOException e) {
- LOG.warn("problem registering {} : {}", pnfName, e.getMessage());
- return -1;
- }
- }
+ }
+
+ public void setAuthorization(String username, String password) {
+ String credentials = username + ":" + password;
+ this.headerMap.put("Authorization", "Basic " + new String(Base64.getEncoder().encode(credentials.getBytes())));
+
+ }
+
+ public boolean pnfMountPointCreate(String pnfName, String ipv4Address, String protocol, String keyId,
+ String username, String password, String commPort) {
+ String message = "";
+ if (protocol.equals("TLS")) {
+ message = updateTLSPayload(pnfName, ipv4Address, username, keyId, commPort);
+ } else { //SSH
+ message = updatePayload(pnfName, ipv4Address, username, password, commPort);
+ LOG.debug("Payload after updating values is: {}", message);
+ }
+ return pnfRequest(pnfName, "PUT", message) == 200;
+
+ }
+
+ private static String updatePayload(String pnfName, String ipv4Address, String username, String password,
+ String portNo) {
+ // @formatter:off
+ return SSH_PAYLOAD.replace("@device-name@", pnfName)
+ .replace("@device-ip@", ipv4Address)
+ .replace("@device-port@", portNo)
+ .replace("@username@", username)
+ .replace("@password@", password);
+ // @formatter:on
+ }
+
+ private static String updateTLSPayload(String pnfName, String ipv4Address, String username, String keyId,
+ String portNo) {
+ // @formatter:off
+ return TLS_PAYLOAD.replace("@device-name@", pnfName)
+ .replace("@device-ip@", ipv4Address)
+ .replace("@username@", username)
+ .replace("@key-id@", keyId)
+ .replace("@device-port@", portNo);
+ // @formatter:on
+ }
+
+ private int pnfRequest(String pnfName, String method, String message) {
+ LOG.info("In pnfRequest - {} : {} : {}", pnfName, method, message);
+ BaseHTTPResponse response;
+ try {
+ //String uri = "http://localhost:8181/restconf/config/network-topology:network-topology/topology/topology-netconf/node/" + pnfName;
+ String uri = MOUNTPOINT_URI + pnfName;
+ response = this.sendRequest(uri, method, message, headerMap);
+ LOG.debug("finished with responsecode {}", response.code);
+ return response.code;
+ } catch (IOException e) {
+ LOG.warn("problem registering {} : {}", pnfName, e.getMessage());
+ return -1;
+ }
+ }
}
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java
index 79f96db32..a8e12767f 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/PNFRegistrationConfig.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*
* ============LICENSE_START========================================================================
* ONAP : ccsdk feature sdnr wt
* =================================================================================================
@@ -14,7 +14,7 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
* ============LICENSE_END==========================================================================
- ******************************************************************************/
+ */
package org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl;
@@ -22,20 +22,21 @@ import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
public class PNFRegistrationConfig implements Configuration {
- private static final String SECTION_MARKER = "pnfRegistration";
+ private static final String SECTION_MARKER = "pnfRegistration";
- private static final String PROPERTY_KEY_CONSUMER_CLASS = "pnfRegConsumerClass";
- private static final String DEFAULT_VALUE_CONSUMER_CLASS = "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
+ private static final String PROPERTY_KEY_CONSUMER_CLASS = "pnfRegConsumerClass";
+ private static final String DEFAULT_VALUE_CONSUMER_CLASS =
+ "org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.DMaaPPNFRegVESMsgConsumer";
public static final String PROPERTY_KEY_CONSUMER_TRANSPORTTYPE = "TransportType";
private static final String DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE = "HTTPNOAUTH";
-
+
public static final String PROPERTY_KEY_CONSUMER_PROTOCOL = "Protocol";
private static final String DEFAULT_VALUE_CONSUMER_PROTOCOL = "http";
-
+
public static final String PROPERTY_KEY_CONSUMER_USERNAME = "username";
private static final String DEFAULT_VALUE_CONSUMER_USERNAME = "username";
-
+
public static final String PROPERTY_KEY_CONSUMER_PASSWORD = "password";
private static final String DEFAULT_VALUE_CONSUMER_PASSWORD = "password";
@@ -62,102 +63,117 @@ public class PNFRegistrationConfig implements Configuration {
public static final String PROPERTY_KEY_CONSUMER_FETCHPAUSE = "fetchPause";
private static final String DEFAULT_VALUE_CONSUMER_FETCHPAUSE = "5000";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT="jersey.config.client.readTimeout";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT="25000";
-
- public static final String PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT="jersey.config.client.connectTimeout";
- private static final String DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT="25000";
-
- private final ConfigurationFileRepresentation configuration;
-
- public PNFRegistrationConfig(ConfigurationFileRepresentation configuration) {
- this.configuration = configuration;
- this.configuration.addSection(SECTION_MARKER);
- defaults();
- }
- @Override
- public String getSectionName() {
- return SECTION_MARKER;
- }
-
- @Override
- public void defaults() {
-
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLASS, DEFAULT_VALUE_CONSUMER_CLASS);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE, DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PROTOCOL, DEFAULT_VALUE_CONSUMER_PROTOCOL);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME, DEFAULT_VALUE_CONSUMER_USERNAME);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD, DEFAULT_VALUE_CONSUMER_PASSWORD);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_HOST_PORT, DEFAULT_VALUE_CONSUMER_HOST_PORT);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC, DEFAULT_VALUE_CONSUMER_TOPIC);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CONTENTTYPE, DEFAULT_VALUE_CONSUMER_CONTENTTYPE);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_GROUP, DEFAULT_VALUE_CONSUMER_GROUP);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_ID, DEFAULT_VALUE_CONSUMER_ID);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TIMEOUT, DEFAULT_VALUE_CONSUMER_TIMEOUT);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_LIMIT, DEFAULT_VALUE_CONSUMER_LIMIT);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_FETCHPAUSE, DEFAULT_VALUE_CONSUMER_FETCHPAUSE);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT, DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT);
- configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT, DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT);
- }
-
- public String getConsumerClass() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLASS);
- }
-
- public String getHostPort() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_HOST_PORT);
- }
-
- public String getTransportType() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE);
- }
-
- public String getProtocol() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PROTOCOL);
- }
-
- public String getUsername() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME);
- }
-
- public String getPassword() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD);
- }
-
- public String getTopic() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC);
- }
-
- public String getConsumerGroup() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_GROUP);
- }
-
- public String getConsumerId() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_ID);
- }
-
- public String getTimeout() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TIMEOUT);
- }
-
- public String getLimit() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_LIMIT);
- }
-
- public String getFetchPause() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_FETCHPAUSE);
- }
-
- public String getContenttype() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CONTENTTYPE);
- }
-
- public String getClientReadTimeout() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT);
- }
-
- public String getClientConnectTimeout() {
- return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT);
- }
+
+ public static final String PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT = "jersey.config.client.readTimeout";
+ private static final String DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT = "25000";
+
+ public static final String PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT = "jersey.config.client.connectTimeout";
+ private static final String DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT = "25000";
+
+ private final ConfigurationFileRepresentation configuration;
+
+ public PNFRegistrationConfig(ConfigurationFileRepresentation configuration) {
+ this.configuration = configuration;
+ this.configuration.addSection(SECTION_MARKER);
+ defaults();
+ }
+
+ @Override
+ public String getSectionName() {
+ return SECTION_MARKER;
+ }
+
+ @Override
+ public void defaults() {
+
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLASS,
+ DEFAULT_VALUE_CONSUMER_CLASS);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE,
+ DEFAULT_VALUE_CONSUMER_TRANSPORTTYPE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PROTOCOL,
+ DEFAULT_VALUE_CONSUMER_PROTOCOL);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME,
+ DEFAULT_VALUE_CONSUMER_USERNAME);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD,
+ DEFAULT_VALUE_CONSUMER_PASSWORD);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_HOST_PORT,
+ DEFAULT_VALUE_CONSUMER_HOST_PORT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC,
+ DEFAULT_VALUE_CONSUMER_TOPIC);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CONTENTTYPE,
+ DEFAULT_VALUE_CONSUMER_CONTENTTYPE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_GROUP,
+ DEFAULT_VALUE_CONSUMER_GROUP);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_ID, DEFAULT_VALUE_CONSUMER_ID);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TIMEOUT,
+ DEFAULT_VALUE_CONSUMER_TIMEOUT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_LIMIT,
+ DEFAULT_VALUE_CONSUMER_LIMIT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_FETCHPAUSE,
+ DEFAULT_VALUE_CONSUMER_FETCHPAUSE);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT,
+ DEFAULT_VALUE_CONSUMER_CLIENT_READTIMEOUT);
+ configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT,
+ DEFAULT_VALUE_CONSUMER_CLIENT_CONNECTTIMEOUT);
+ }
+
+ public String getConsumerClass() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLASS);
+ }
+
+ public String getHostPort() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_HOST_PORT);
+ }
+
+ public String getTransportType() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TRANSPORTTYPE);
+ }
+
+ public String getProtocol() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PROTOCOL);
+ }
+
+ public String getUsername() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_USERNAME);
+ }
+
+ public String getPassword() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_PASSWORD);
+ }
+
+ public String getTopic() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TOPIC);
+ }
+
+ public String getConsumerGroup() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_GROUP);
+ }
+
+ public String getConsumerId() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_ID);
+ }
+
+ public String getTimeout() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_TIMEOUT);
+ }
+
+ public String getLimit() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_LIMIT);
+ }
+
+ public String getFetchPause() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_FETCHPAUSE);
+ }
+
+ public String getContenttype() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CONTENTTYPE);
+ }
+
+ public String getClientReadTimeout() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_READTIMEOUT);
+ }
+
+ public String getClientConnectTimeout() {
+ return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_CONSUMER_CLIENT_CONNECTTIMEOUT);
+ }
}