diff options
Diffstat (limited to 'ms')
3 files changed, 43 insertions, 46 deletions
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt index 6ef4f41fb..aa156e2a8 100644 --- a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt +++ b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt @@ -57,7 +57,7 @@ class NetconfDeviceCommunicator(private var inputStream: InputStream, while (!socketClosed) { val cInt = bufferReader.read() if (cInt == -1) { - log.error("$deviceInfo: Received cInt = -1") + log.debug("$deviceInfo: Received end of stream, closing socket.") socketClosed = true } val c = cInt.toChar() diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt index 7e56e3e51..b1121b3d4 100644 --- a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt +++ b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt @@ -80,11 +80,11 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ RpcStatus.FAILURE, true)) { rpcService.closeSession(true) } - - session.close() - // Closes the socket which should interrupt the streamHandler - channel.close() - client.close() + try { + close() + } catch (ioe: IOException) { + log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe) + } } override fun reconnect() { @@ -98,8 +98,8 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ checkAndReestablish() try { - return streamHandler.sendMessage(formattedRequest, messageId).get(replyTimeout.toLong(), TimeUnit.SECONDS) -// replies.remove(messageId) + return streamHandler.getFutureFromSendMessage(streamHandler.sendMessage(formattedRequest, messageId), + replyTimeout.toLong(), TimeUnit.SECONDS) } catch (e: InterruptedException) { Thread.currentThread().interrupt() throw NetconfException("$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest", e) @@ -109,10 +109,7 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ } catch (e: ExecutionException) { log.warn("$deviceInfo: Closing session($sessionId) due to unexpected Error", e) try { - session.close() - // Closes the socket which should interrupt the streamHandler - channel.close() - client.close() + close() } catch (ioe: IOException) { log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe) } @@ -138,20 +135,23 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ override fun checkAndReestablish() { try { - if (client.isClosed) { - log.info("Trying to restart the whole SSH connection with {}", deviceInfo) - clearReplies() - startConnection() - } else if (session.isClosed) { - log.info("Trying to restart the session with {}", deviceInfo) - clearReplies() - startSession() - } else if (channel.isClosed) { - log.info("Trying to reopen the channel with {}", deviceInfo) - clearReplies() - openChannel() - } else { - return + when { + client.isClosed -> { + log.info("Trying to restart the whole SSH connection with {}", deviceInfo) + clearReplies() + startConnection() + } + session.isClosed -> { + log.info("Trying to restart the session with {}", deviceInfo) + clearReplies() + startSession() + } + channel.isClosed -> { + log.info("Trying to reopen the channel with {}", deviceInfo) + clearReplies() + openChannel() + } + else -> return } } catch (e: IOException) { log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message) @@ -257,16 +257,8 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ } val capabilityMatcher = NetconfMessageUtils.CAPABILITY_REGEX_PATTERN.matcher(serverHelloResponse) - while (capabilityMatcher.find()) { - deviceCapabilities.plus(capabilityMatcher.group(1)) - } - } - - fun sessionstatus(state:String): Boolean{ - return when (state){ - "Close" -> channel.isClosed - "Open" -> channel.isOpen - else -> false + while (capabilityMatcher.find()) { //TODO: refactor to add unit test easily for device capability accumulation. + deviceCapabilities.add(capabilityMatcher.group(1)) } } @@ -279,7 +271,6 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ * Used by {@link NetconfSessionListenerImpl} */ internal fun addDeviceErrorReply(errReply: String) { - println("addDeviceErrorReply (errReply: $errReply") //TODO : get rid of this. errorReplies.add(errReply) } @@ -288,11 +279,21 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ * Used by {@link NetconfSessionListenerImpl} */ internal fun addDeviceReply(messageId: String, replyMsg: String) { - println("addDeviceReply (messageId: $messageId replyMsg: $replyMsg") //TODO : get rid of this. replies[messageId]?.complete(replyMsg) } /** + * Closes the session/channel/client + */ + @Throws(IOException::class) + private fun close() { + session.close() + // Closes the socket which should interrupt the streamHandler + channel.close() + client.close() + } + + /** * Internal function for accessing replies for testing. */ internal fun getReplies() = replies @@ -301,7 +302,6 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ * internal function for accessing errorReplies for testing. */ internal fun getErrorReplies() = errorReplies - internal fun clearErrorReplies() = errorReplies.clear() internal fun clearReplies() = replies.clear() internal fun setClient(client: SshClient) { this.client = client } diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt index 1f526f445..f5fd5410a 100644 --- a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt +++ b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt @@ -168,7 +168,6 @@ class NetconfSessionImplTest { } - @Ignore //TODO undo close method removal @Test fun `disconnect wraps exception from ssh closing error`() { val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) @@ -295,7 +294,6 @@ class NetconfSessionImplTest { } } - @Ignore //TODO revert back on getFutureFromSendMessage @Test fun `syncRpc throws NetconfException if TimeoutException is caught`() { val expectedExceptionMsg = "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after ${deviceInfo.replyTimeout} sec." @@ -310,23 +308,23 @@ class NetconfSessionImplTest { } } - @Ignore @Test fun `syncRpc throws NetconfException if ExecutionException is caught`() { val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest" assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) { - val netconfSessionSpy = spyk(netconfSession) + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = false) val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg) every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws - ExecutionException("exec exception", Exception("nested exception")) //TODO revert getFutureFromSendMessage back + ExecutionException("exec exception", Exception("nested exception")) + every { netconfSessionSpy["close"]() as Unit } just Runs every { netconfSessionSpy.checkAndReestablish() } just Runs + netconfSessionSpy.setSession(mockClientSession) //call the method netconfSessionSpy.syncRpc("0", "0") } } - @Ignore //TODO revert back on getFutureFromSendMessage @Test fun `syncRpc throws NetconfException if caught ExecutionException and failed to close SSH session`() { val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest" @@ -525,7 +523,6 @@ class NetconfSessionImplTest { verify { mockSshClient.close() } } - @Ignore @Test fun `disconnect wraps IOException if channel doesn't close`() { //this test is equivalent to others every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE |