aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/functions/netconf-executor/src/main
diff options
context:
space:
mode:
authorOleg Mitsura <oleg.mitsura@amdocs.com>2019-04-10 22:32:11 -0400
committerOleg Mitsura <oleg.mitsura@amdocs.com>2019-04-11 08:05:11 -0400
commit7581c93cbe2aedde587be766628b3a2d5b6258b0 (patch)
tree5c5d459559b37faa539dc33a4bb2dc6be79f9150 /ms/blueprintsprocessor/functions/netconf-executor/src/main
parent0c0e1d2e6d5e57a99ff8551a2d7b8e9cdab7d860 (diff)
netconf-executor: NetconfSessionImplTest improvements
Issue-ID: CCSDK-1126 Change-Id: Ied0360a37f8f22801c63c2aeb70ee73d45cc7b4b Signed-off-by: Oleg Mitsura <oleg.mitsura@amdocs.com>
Diffstat (limited to 'ms/blueprintsprocessor/functions/netconf-executor/src/main')
-rw-r--r--ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt2
-rw-r--r--ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt63
2 files changed, 38 insertions, 27 deletions
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt
index 6ef4f41fb..aa156e2a8 100644
--- a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt
+++ b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt
@@ -57,7 +57,7 @@ class NetconfDeviceCommunicator(private var inputStream: InputStream,
while (!socketClosed) {
val cInt = bufferReader.read()
if (cInt == -1) {
- log.error("$deviceInfo: Received cInt = -1")
+ log.debug("$deviceInfo: Received end of stream, closing socket.")
socketClosed = true
}
val c = cInt.toChar()
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt
index 7e56e3e51..13c1bfa37 100644
--- a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt
+++ b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt
@@ -80,11 +80,11 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ
RpcStatus.FAILURE, true)) {
rpcService.closeSession(true)
}
-
- session.close()
- // Closes the socket which should interrupt the streamHandler
- channel.close()
- client.close()
+ try {
+ close()
+ } catch (ioe: IOException) {
+ log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
+ }
}
override fun reconnect() {
@@ -98,8 +98,8 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ
checkAndReestablish()
try {
- return streamHandler.sendMessage(formattedRequest, messageId).get(replyTimeout.toLong(), TimeUnit.SECONDS)
-// replies.remove(messageId)
+ return streamHandler.getFutureFromSendMessage(streamHandler.sendMessage(formattedRequest, messageId),
+ replyTimeout.toLong(), TimeUnit.SECONDS)
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e)
@@ -109,10 +109,7 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ
} catch (e: ExecutionException) {
log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e)
try {
- session.close()
- // Closes the socket which should interrupt the streamHandler
- channel.close()
- client.close()
+ close()
} catch (ioe: IOException) {
log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
}
@@ -138,20 +135,23 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ
override fun checkAndReestablish() {
try {
- if (client.isClosed) {
- log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
- clearReplies()
- startConnection()
- } else if (session.isClosed) {
- log.info("Trying to restart the session with {}", deviceInfo)
- clearReplies()
- startSession()
- } else if (channel.isClosed) {
- log.info("Trying to reopen the channel with {}", deviceInfo)
- clearReplies()
- openChannel()
- } else {
- return
+ when {
+ client.isClosed -> {
+ log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
+ clearReplies()
+ startConnection()
+ }
+ session.isClosed -> {
+ log.info("Trying to restart the session with {}", deviceInfo)
+ clearReplies()
+ startSession()
+ }
+ channel.isClosed -> {
+ log.info("Trying to reopen the channel with {}", deviceInfo)
+ clearReplies()
+ openChannel()
+ }
+ else -> return
}
} catch (e: IOException) {
log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
@@ -258,7 +258,7 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ
val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse)
while (capabilityMatcher.find()) {
- deviceCapabilities.plus(capabilityMatcher.group(1))
+ deviceCapabilities.add(capabilityMatcher.group(1))
}
}
@@ -293,6 +293,17 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ
}
/**
+ * Closes the session/channel/client
+ */
+ @Throws(IOException::class)
+ private fun close() {
+ session.close()
+ // Closes the socket which should interrupt the streamHandler
+ channel.close()
+ client.close()
+ }
+
+ /**
* Internal function for accessing replies for testing.
*/
internal fun getReplies() = replies