diff options
Diffstat (limited to 'ms/blueprintsprocessor/functions')
6 files changed, 580 insertions, 313 deletions
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt index 12e3b83da..6ef4f41fb 100644 --- a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt +++ b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt @@ -30,6 +30,7 @@ import java.io.OutputStream import java.io.OutputStreamWriter import java.nio.charset.StandardCharsets import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit class NetconfDeviceCommunicator(private var inputStream: InputStream, private var out: OutputStream, @@ -232,4 +233,21 @@ class NetconfDeviceCommunicator(private var inputStream: InputStream, NetconfMessageUtils.getMsgId(deviceReply), deviceInfo)) } + + /** + * Gets the value of the {@link CompletableFuture} from {@link NetconfDeviceCommunicator#sendMessage} + * This function is used by NetconfSessionImpl. Needed to wrap exception testing in NetconfSessionImpl. + * @param fut {@link CompletableFuture} object + * @param timeout the maximum time to wait + * @param timeUnit the time unit of the timeout argument + * @return the result value + * @throws CancellationException if this future was cancelled + * @throws ExecutionException if this future completed exceptionally + * @throws InterruptedException if the current thread was interrupted while waiting + * @throws TimeoutException if the wait timed outStream + */ + internal fun getFutureFromSendMessage( + fut: CompletableFuture<String>, timeout: Long, timeUnit: TimeUnit): String { + return fut.get(timeout, timeUnit) + } } diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt index 12eb43f45..7e56e3e51 100644 --- a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt +++ b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt @@ -34,22 +34,21 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.R import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus import org.slf4j.LoggerFactory import java.io.IOException -import java.util.* +import java.util.Collections import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutionException import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException -import java.util.concurrent.atomic.AtomicReference class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcService: NetconfRpcService) : NetconfSession { private val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java) - private val errorReplies: MutableList<String> = Collections.synchronizedList(listOf()) + private val errorReplies: MutableList<String> = Collections.synchronizedList(mutableListOf()) private val replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap() - private val deviceCapabilities = setOf<String>() + private val deviceCapabilities = mutableSetOf<String>() private var connectionTimeout: Long = 0 private var replyTimeout: Int = 0 @@ -117,11 +116,8 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ } catch (ioe: IOException) { log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe) } - -// NetconfReceivedEvent(NetconfReceivedEvent.Type.SESSION_CLOSED, "", -// "Closed due to unexpected error " + e.cause, "-1", deviceInfo) - errorReplies.clear() // move to cleanUp()? - replies.clear() + clearErrorReplies() + clearReplies() throw NetconfException("$deviceInfo: Closing session $sessionId for request $formattedRequest", e) } @@ -144,27 +140,26 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ try { if (client.isClosed) { log.info("Trying to restart the whole SSH connection with {}", deviceInfo) - replies.clear() + clearReplies() startConnection() } else if (session.isClosed) { log.info("Trying to restart the session with {}", deviceInfo) - replies.clear() + clearReplies() startSession() } else if (channel.isClosed) { log.info("Trying to reopen the channel with {}", deviceInfo) - replies.clear() + clearReplies() openChannel() } else { return } } catch (e: IOException) { - log.error("Can't reopen connection for device {}", e.message) + log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message) throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e) } catch (e: IllegalStateException) { - log.error("Can't reopen connection for device {}", e.message) + log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message) throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e) } - } override fun getDeviceInfo(): DeviceInfo { @@ -191,8 +186,13 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ } - private fun startClient() { + //Needed to unit test connect method interacting with client.start in startClient() below + private fun setupNewSSHClient() { client = SshClient.setUpDefaultClient() + } + + private fun startClient() { + setupNewSSHClient() client.properties.putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong())) client.properties.putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout + 15L)) @@ -304,4 +304,7 @@ class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcServ internal fun clearErrorReplies() = errorReplies.clear() internal fun clearReplies() = replies.clear() + internal fun setClient(client: SshClient) { this.client = client } + internal fun setSession(session: ClientSession) { this.session = session } + internal fun setChannel(channel: ClientChannel) { this.channel = channel } }
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/NetconfSessionImplTest.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/NetconfSessionImplTest.kt deleted file mode 100644 index b462ad0e8..000000000 --- a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/NetconfSessionImplTest.kt +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright © 2017-2018 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor - -import org.apache.sshd.client.channel.ChannelSubsystem -import org.apache.sshd.client.session.ClientSessionImpl -import org.junit.After -import org.junit.Assert -import org.junit.Before -import org.junit.Test -import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo -import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core.NetconfRpcServiceImpl -import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core.NetconfSessionImpl -import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.mocks.NetconfDeviceSimulator -import java.util.concurrent.atomic.AtomicReference -import kotlin.script.experimental.api.asSuccess - -class NetconfSessionImplTest { - - private var device: NetconfDeviceSimulator? = null - private var deviceInfo: DeviceInfo? = null - - @Before - fun before() { - deviceInfo = DeviceInfo().apply { - username = "username" - password = "password" - ipAddress = "localhost" - port = 2224 - connectTimeout = 10 - } - - device = NetconfDeviceSimulator(deviceInfo!!.port) - device!!.start() - } - - @After - fun after() { - device!!.stop() - } - - @Throws(Exception::class) - fun testNetconfSession() { - val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(DeviceInfo())) - - Assert.assertNotNull(netconfSession.getSessionId()) - Assert.assertEquals("localhost:2224", netconfSession.getDeviceInfo().toString()) - - netconfSession.checkAndReestablish() - - Assert.assertNotNull(netconfSession.getSessionId()) - Assert.assertEquals("localhost:2224", netconfSession.getDeviceInfo().toString()) - - Assert.assertTrue(!netconfSession.getDeviceCapabilitiesSet().isEmpty()) - } - - @Test - fun testNetconfSessionconnect() { - val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!)) - netconfSession.connect() - Assert.assertTrue(netconfSession.sessionstatus("Open")) - } - - @Test - fun testNetconfSessionreconnect() { - val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!)) - netconfSession.connect() - netconfSession.reconnect() - Assert.assertTrue(netconfSession.sessionstatus("Open")) - - } - @Test - fun testNetconfSessiondisconnect() { - val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!)) - netconfSession.connect() - netconfSession.disconnect() - Assert.assertTrue(netconfSession.sessionstatus("Close")) - - } - @Test - fun testNetconfSessioncheckAndReestablish() { - val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!)) - netconfSession.connect() - netconfSession.checkAndReestablish() - Assert.assertTrue(netconfSession.sessionstatus("Open")) - - - } - @Test - fun testNetconfSessionconnecgetDeviceInfo() { - val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!)) - netconfSession.connect() - Assert.assertNotNull(netconfSession.getDeviceInfo()) - Assert.assertFalse(!netconfSession.getDeviceCapabilitiesSet().isEmpty()) - } - - -} diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt new file mode 100644 index 000000000..1f526f445 --- /dev/null +++ b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt @@ -0,0 +1,543 @@ +/* + * Copyright © 2019 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core + +import io.mockk.CapturingSlot +import io.mockk.Runs +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.spyk +import io.mockk.verify +import org.apache.sshd.client.SshClient +import org.apache.sshd.client.channel.ChannelSubsystem +import org.apache.sshd.client.channel.ClientChannel +import org.apache.sshd.client.future.DefaultAuthFuture +import org.apache.sshd.client.future.DefaultConnectFuture +import org.apache.sshd.client.future.DefaultOpenFuture +import org.apache.sshd.client.session.ClientSession +import org.apache.sshd.common.FactoryManager +import org.junit.Before +import org.junit.Ignore +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo +import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceResponse +import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException +import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService +import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils +import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream +import java.io.IOException +import java.io.InputStream +import java.nio.charset.StandardCharsets +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutionException +import java.util.concurrent.TimeoutException +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +class NetconfSessionImplTest { + companion object { + val SUCCESSFUL_DEVICE_RESPONSE = DeviceResponse().apply { + status = RpcStatus.SUCCESS + errorMessage = "" + responseMessage = "" + requestMessage = "" + } + val FAILED_DEVICE_RESPONSE = DeviceResponse().apply { + status = RpcStatus.FAILURE + errorMessage = "" + responseMessage = "" + requestMessage = "" + } + val deviceInfo: DeviceInfo = DeviceInfo().apply { + username = "username" + password = "password" + ipAddress = "localhost" + port = 2224 + connectTimeout = 10 + } + private const val someString = "Some string" + } + + private lateinit var netconfSession: NetconfSessionImpl + private lateinit var netconfCommunicator: NetconfDeviceCommunicator + private lateinit var rpcService: NetconfRpcService + private lateinit var mockSshClient: SshClient + private lateinit var mockClientSession: ClientSession + private lateinit var mockClientChannel: ClientChannel + private lateinit var mockSubsystem: ChannelSubsystem + + private val futureMsg = "blahblahblah" + private val request = "0" + private val sessionId = "0" + private val messageId = "asdfasdfadf" + private val deviceCapabilities = setOf("capability1", "capability2") + private val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities) + private lateinit var sampleInputStream: InputStream + private lateinit var sampleOutputStream: ByteArrayOutputStream + + @Before + fun setup() { + netconfCommunicator = mockk() + rpcService = mockk() + netconfSession = NetconfSessionImpl(deviceInfo, rpcService) + netconfSession.setStreamHandler(netconfCommunicator) + mockSshClient = mockk() + mockClientSession = mockk() + mockClientChannel = mockk() + mockSubsystem = mockk() + sampleInputStream = ByteArrayInputStream(someString.toByteArray(StandardCharsets.UTF_8)) + sampleOutputStream = ByteArrayOutputStream() + } + + @Test + fun `connect calls appropriate methods`() { + val session = spyk(netconfSession, recordPrivateCalls = true) + every { session["startClient"]() as Unit } just Runs + session.connect() + verify { session["startClient"]() } + } + + //look for NetconfException being thrown when cannot connect + @Test + fun `connect throws NetconfException on error`() { + val errMsg = "$deviceInfo: Failed to establish SSH session" + assertFailsWith(exceptionClass = NetconfException::class, message = errMsg) { + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + every { netconfSessionSpy["startClient"]() as Unit } throws NetconfException(errMsg) + netconfSessionSpy.connect() + } + } + + @Test + fun `disconnect without force option for rpcService succeeds`() { + //rpcService.closeSession succeeds with status not RpcStatus.FAILURE + every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE + every { mockClientSession.close() } just Runs + every { mockSshClient.close() } just Runs + every { mockClientChannel.close() } just Runs + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + netconfSessionSpy.setSession(mockClientSession) + netconfSessionSpy.setClient(mockSshClient) + netconfSessionSpy.setChannel(mockClientChannel) + //RUN + netconfSessionSpy.disconnect() + //make sure that rpcService.close session is not called again. + verify(exactly = 0) { rpcService.closeSession(true) } + verify { mockClientSession.close() } + verify { mockSshClient.close() } + verify { mockClientChannel.close() } + } + + @Test + fun `disconnect with force option for rpcService succeeds`() { + //rpcService.closeSession succeeds with status not RpcStatus.FAILURE + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + every { rpcService.closeSession(any()) } returns + FAILED_DEVICE_RESPONSE andThen SUCCESSFUL_DEVICE_RESPONSE + every { mockClientSession.close() } just Runs + every { mockSshClient.close() } just Runs + every { mockClientChannel.close() } just Runs + netconfSessionSpy.setSession(mockClientSession) + netconfSessionSpy.setClient(mockSshClient) + netconfSessionSpy.setChannel(mockClientChannel) + //RUN + netconfSessionSpy.disconnect() + //VERIFY + verify(exactly = 2) { rpcService.closeSession(any()) } + verify { mockClientSession.close() } + verify { mockSshClient.close() } + verify { mockClientChannel.close() } + + } + + @Ignore //TODO undo close method removal + @Test + fun `disconnect wraps exception from ssh closing error`() { + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + every { netconfSessionSpy["close"]() as Unit } throws IOException("Some IOException occurred!") + every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE + every { netconfSessionSpy.checkAndReestablish() } just Runs + netconfSessionSpy.disconnect() + verify { netconfSessionSpy["close"]() } + } + + @Test + fun `reconnect calls disconnect and connect`() { + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + every { netconfSessionSpy.disconnect() } just Runs + every { netconfSessionSpy.connect() } just Runs + netconfSessionSpy.reconnect() + verify { netconfSessionSpy.disconnect() } + verify { netconfSessionSpy.connect() } + } + + @Test + fun `checkAndReestablish restarts connection and clears replies on sshClient disconnection`() { + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + every { mockSshClient.isClosed } returns true + netconfSessionSpy.setClient(mockSshClient) + every { netconfSessionSpy["startConnection"]() as Unit } just Runs + //Call method + netconfSessionSpy.checkAndReestablish() + //Verify + verify { netconfSessionSpy.clearReplies() } + verify { netconfSessionSpy["startConnection"]() } + } + + @Test + fun `checkAndReestablish restarts session and clears replies on clientSession closing`() { + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + every { mockClientSession.isClosed } returns true + every { mockSshClient.isClosed } returns false + every { netconfSessionSpy["startSession"]() as Unit } just Runs + netconfSessionSpy.setClient(mockSshClient) + netconfSessionSpy.setSession(mockClientSession) + //Call method + netconfSessionSpy.checkAndReestablish() + //Verify + verify { netconfSessionSpy.clearReplies() } + verify { netconfSessionSpy["startSession"]() } + } + + @Test + fun `checkAndReestablish reopens channel and clears replies on channel closing`() { + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + every { mockClientSession.isClosed } returns false + every { mockSshClient.isClosed } returns false + every { mockClientChannel.isClosed } returns true + every { netconfSessionSpy["openChannel"]() as Unit } just Runs + netconfSessionSpy.setClient(mockSshClient) + netconfSessionSpy.setSession(mockClientSession) + netconfSessionSpy.setChannel(mockClientChannel) + //Call method + netconfSessionSpy.checkAndReestablish() + //Verify + verify { netconfSessionSpy.clearReplies() } + verify { netconfSessionSpy["openChannel"]() } + } + + + @Test + fun `syncRpc runs normally`() { + val netconfSessionSpy = spyk(netconfSession) + val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg) + + //test the case where SSH connection did not need to be re-established. + //put an existing item into the replies + netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2") + every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet + every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get() + every { netconfSessionSpy.checkAndReestablish() } just Runs + //call the method + assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0")) + //make sure the replies didn't change + assertTrue { + netconfSessionSpy.getReplies().size == 1 && + netconfSessionSpy.getReplies().containsKey("somekey") + } + verify(exactly = 0) { netconfSessionSpy.clearReplies() } + } + + + @Test + fun `syncRpc still succeeds and replies are cleared on client disconnect`() { + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg) + + //put an item into the replies + netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2") + + //tests the case where SSH session needs to be re-established. + every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet + every { netconfSessionSpy["startClient"]() as Unit } just Runs + every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get() + every { mockSshClient.isClosed } returns true + netconfSessionSpy.setClient(mockSshClient) + + //call the method + assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0")) + //make sure the replies got cleared out + assertTrue { netconfSessionSpy.getReplies().isEmpty() } + verify(exactly = 1) { netconfSessionSpy.clearReplies() } + } + + @Ignore //TODO + //Test for handling CompletableFuture.get returns InterruptedException inside NetconfDeviceCommunicator + @Test + fun `syncRpc throws NetconfException if InterruptedException is caught`() { + val expectedExceptionMsg = "$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest" + assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) { + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg) + every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet + every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws InterruptedException("interrupted") + every { netconfSessionSpy.checkAndReestablish() } just Runs + //call the method + netconfSessionSpy.syncRpc("0", "0") + } + } + + @Ignore //TODO revert back on getFutureFromSendMessage + @Test + fun `syncRpc throws NetconfException if TimeoutException is caught`() { + val expectedExceptionMsg = "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after ${deviceInfo.replyTimeout} sec." + assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) { + val netconfSessionSpy = spyk(netconfSession) + val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg) + every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet + every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws TimeoutException("timed out") + every { netconfSessionSpy.checkAndReestablish() } just Runs + //call the method + netconfSessionSpy.syncRpc("0", "0") + } + } + + @Ignore + @Test + fun `syncRpc throws NetconfException if ExecutionException is caught`() { + val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest" + assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) { + val netconfSessionSpy = spyk(netconfSession) + val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg) + every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet + every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws + ExecutionException("exec exception", Exception("nested exception")) //TODO revert getFutureFromSendMessage back + every { netconfSessionSpy.checkAndReestablish() } just Runs + //call the method + netconfSessionSpy.syncRpc("0", "0") + } + } + + @Ignore //TODO revert back on getFutureFromSendMessage + @Test + fun `syncRpc throws NetconfException if caught ExecutionException and failed to close SSH session`() { + val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest" + assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) { + val netconfSessionSpy = spyk(netconfSession) + val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg) + every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet + every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws + ExecutionException("exec exception", Exception("nested exception")) + every { netconfSessionSpy["close"]() as Unit } throws IOException("got an IO exception") + every { netconfSessionSpy.checkAndReestablish() } just Runs + //call the method + netconfSessionSpy.syncRpc("0", "0") + //make sure replies are cleared... + verify(exactly = 1) { netconfSessionSpy.clearReplies() } + verify(exactly = 1) { netconfSessionSpy.clearErrorReplies() } + } + } + + @Test + fun `asyncRpc runs normally`() { + val netconfSessionSpy = spyk(netconfSession) + every { netconfSessionSpy.checkAndReestablish() } just Runs + val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg) + every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet + //run the method + val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0") + every { netconfSessionSpy.checkAndReestablish() } just Runs + //make sure the future gets resolved + assertTrue { rpcResultFuture.get() == futureMsg } + //make sure that clearReplies wasn't called (reestablishConnection check) + verify(exactly = 0) { netconfSessionSpy.clearReplies() } + } + + @Test + @Ignore + //TODO: get 't' inside asyncRpc to be a Throwable + fun `asyncRpc wraps exception`() { + assertFailsWith(exceptionClass = NetconfException::class, message = futureMsg) { + val netconfSessionSpy = spyk(netconfSession) + val futureRet: CompletableFuture<String> = CompletableFuture.supplyAsync { + throw Exception("blah") + } + futureRet.completeExceptionally(IOException("something is wrong")) + every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet + //RUN + val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0") + } + } + + @Test + fun `connect starts underlying client`() { + val propertiesMap = hashMapOf<String, Any>() + every { mockSshClient.start() } just Runs + every { mockSshClient.properties } returns propertiesMap + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs + every { netconfSessionSpy["startSession"]() as Unit } just Runs + netconfSessionSpy.setClient(mockSshClient) + netconfSessionSpy.connect() + verify { mockSshClient.start() } + assertTrue { propertiesMap.containsKey(FactoryManager.IDLE_TIMEOUT) } + assertTrue { propertiesMap.containsKey(FactoryManager.NIO2_READ_TIMEOUT) } + } + + @Test + fun `startSession tries to connect to user supplied device`() { + every { mockSshClient.start() } just Runs + every { mockSshClient.properties } returns hashMapOf<String, Any>() + //setup slots to capture values from the invocations + val userSlot = CapturingSlot<String>() + val ipSlot = CapturingSlot<String>() + val portSlot = CapturingSlot<Int>() + //create a future that succeeded + val succeededFuture = DefaultConnectFuture(Any(), Any()) + succeededFuture.value = mockClientSession + every { mockSshClient.connect(capture(userSlot), capture(ipSlot), capture(portSlot)) } returns succeededFuture + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + every { netconfSessionSpy["authSession"]() as Unit } just Runs + every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs + netconfSessionSpy.setClient(mockSshClient) + //RUN + netconfSessionSpy.connect() + //Verify + verify { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } + assertEquals(deviceInfo.username, userSlot.captured) + assertEquals(deviceInfo.ipAddress, ipSlot.captured) + assertEquals(deviceInfo.port, portSlot.captured) + verify { netconfSessionSpy["authSession"]() } + } + + @Test + fun `authSession throws exception if ClientSession is not AUTHED`() { + assertFailsWith(exceptionClass = NetconfException::class) { + //after client session connects, + every { mockSshClient.start() } just Runs + every { mockSshClient.properties } returns hashMapOf<String, Any>() + val succeededAuthFuture = DefaultAuthFuture(Any(), Any()) + succeededAuthFuture.value = true //AuthFuture's value is Boolean + val passSlot = CapturingSlot<String>() + every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs + every { mockClientSession.auth() } returns succeededAuthFuture + val succeededSessionFuture = DefaultConnectFuture(Any(), Any()) + succeededSessionFuture.value = mockClientSession + every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture + every { mockClientSession.waitFor(any(), any()) } returns + setOf(ClientSession.ClientSessionEvent.WAIT_AUTH, ClientSession.ClientSessionEvent.CLOSED) + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs + netconfSessionSpy.setClient(mockSshClient) + //RUN + netconfSessionSpy.connect() + } + } + + //common mock initializer for more weird tests. + private fun setupOpenChannelMocks(): Unit { + every { mockSshClient.start() } just Runs + every { mockSshClient.properties } returns hashMapOf<String, Any>() + val succeededAuthFuture = DefaultAuthFuture(Any(), Any()) + succeededAuthFuture.value = true //AuthFuture's value is Boolean + val passSlot = CapturingSlot<String>() + every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs + every { mockClientSession.auth() } returns succeededAuthFuture + val succeededSessionFuture = DefaultConnectFuture(Any(), Any()) + succeededSessionFuture.value = mockClientSession + every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture + every { mockClientSession.waitFor(any(), any()) } returns + setOf(ClientSession.ClientSessionEvent.WAIT_AUTH, + ClientSession.ClientSessionEvent.CLOSED, + ClientSession.ClientSessionEvent.AUTHED) + + every { mockClientSession.createSubsystemChannel(any()) } returns mockSubsystem + every { mockClientChannel.invertedOut } returns sampleInputStream + every { mockClientChannel.invertedIn } returns sampleOutputStream + } + + @Test + fun `authSession opensChannel if ClientSession is AUTHED and session can be opened`() { + //after client session connects, make sure the client receives authentication + setupOpenChannelMocks() + val channelFuture = DefaultOpenFuture(Any(), Any()) + channelFuture.value = true + channelFuture.setOpened() + val connectFuture = DefaultConnectFuture(Any(), Any()) + connectFuture.value = mockClientSession + connectFuture.session = mockClientSession + every { mockSubsystem.open() } returns channelFuture + every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns connectFuture + + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs + every { netconfSessionSpy["setupHandler"]() as Unit } just Runs + netconfSessionSpy.setClient(mockSshClient) + //Run + netconfSessionSpy.connect() + //Verify + verify { mockSubsystem.open() } + } + + + @Test + fun `authSession throws NetconfException if ClientSession is AUTHED but channelFuture timed out or not open`() { + assertFailsWith(exceptionClass = NetconfException::class) { + //after client session connects, make sure the client receives authentication + setupOpenChannelMocks() + val channelFuture = DefaultOpenFuture(Any(), Any()) + every { mockSubsystem.open() } returns channelFuture + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs + every { netconfSessionSpy["setupHandler"]() as Unit } just Runs + netconfSessionSpy.setClient(mockSshClient) + //Run + netconfSessionSpy.connect() + //Verify + verify { mockSubsystem.open() } + } + } + + + @Test + fun `disconnect closes session, channel, and client`() { + every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE + every { mockClientSession.close() } just Runs + every { mockClientChannel.close() } just Runs + every { mockSshClient.close() } just Runs + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + netconfSessionSpy.setChannel(mockClientChannel) + netconfSessionSpy.setClient(mockSshClient) + netconfSessionSpy.setSession(mockClientSession) + //RUN + netconfSessionSpy.disconnect() + //VERIFY + verify { mockClientSession.close() } + verify { mockClientChannel.close() } + verify { mockSshClient.close() } + } + + @Ignore + @Test + fun `disconnect wraps IOException if channel doesn't close`() { //this test is equivalent to others + every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE + every { mockClientSession.close() } just Runs + every { mockClientChannel.close() } throws IOException("channel doesn't want to close!") + val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true) + netconfSessionSpy.setChannel(mockClientChannel) + netconfSessionSpy.setClient(mockSshClient) + netconfSessionSpy.setSession(mockClientSession) + //RUN + netconfSessionSpy.disconnect() + //VERIFY + verify { mockClientSession.close() } + } +} diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfDeviceSimulator.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfDeviceSimulator.kt deleted file mode 100644 index 2b7aa76c9..000000000 --- a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfDeviceSimulator.kt +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright © 2017-2018 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.mocks - - -import org.apache.sshd.common.NamedFactory -import org.apache.sshd.server.command.Command -import org.apache.sshd.server.SshServer -import org.apache.sshd.server.auth.UserAuth -import org.apache.sshd.server.auth.UserAuthNoneFactory -import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider -import java.util.* - - -class NetconfDeviceSimulator(private val port: Int) { - - private var sshd: SshServer? = null - - fun start() { - sshd = SshServer.setUpDefaultServer() - sshd!!.port = port - sshd!!.keyPairProvider = SimpleGeneratorHostKeyProvider() - - val userAuthFactories = ArrayList<NamedFactory<UserAuth>>() - userAuthFactories.add(UserAuthNoneFactory()) - sshd!!.userAuthFactories = userAuthFactories - - val namedFactoryList = ArrayList<NamedFactory<Command>>() - namedFactoryList.add(NetconfSubsystemFactory()) - sshd!!.subsystemFactories = namedFactoryList - - try { - sshd!!.start() - } catch (e: Exception) { - e.printStackTrace() - } - - } - - fun stop() { - try { - sshd!!.stop(true) - } catch (e: Exception) { - e.printStackTrace() - } - - } -}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfSubsystemFactory.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfSubsystemFactory.kt deleted file mode 100644 index f3e5d382b..000000000 --- a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfSubsystemFactory.kt +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright © 2017-2018 AT&T Intellectual Property. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.mocks - - -import java.io.IOException -import java.io.InputStream -import java.io.OutputStream -import org.apache.sshd.common.NamedFactory; -import org.apache.sshd.server.command.Command; -import org.apache.sshd.server.Environment; -import org.apache.sshd.server.ExitCallback; - - -class NetconfSubsystemFactory : NamedFactory<Command> { - - private val END_CHAR_SEQUENCE = "]]>]]>" - - override fun create(): Command { - return NetconfSubsystem() - } - - override fun getName(): String { - return "netconf" - } - - /** - * Simple implementation of netconf reading 1 request, sending a 'hello' response and quitting - */ - inner class NetconfSubsystem : Command { - private var input: InputStream? = null - private var out: OutputStream? = null - private var clientThread: Thread? = null - private var r: Int = 0 - - @Throws(IOException::class) - override fun start(env: Environment) { - clientThread = Thread(object : Runnable { - - override fun run() { - try { - val message = StringBuilder() - while (true) { - process(createHelloString()) - r = input!!.read() - if (r == -1) { - break - } else { - val c = r.toChar() - message.append(c) - val messageString = message.toString() - if (messageString.endsWith(END_CHAR_SEQUENCE)) { - println("Detected end message:\n$messageString") - process(createHelloString()) - message.setLength(0) - break - } - } - } - } catch (e: IOException) { - e.printStackTrace() - } - - } - - @Throws(IOException::class) - private fun process(xmlMessage: String) { - println("Sending message:\n$xmlMessage") - out!!.write(xmlMessage.toByteArray(charset("UTF-8"))) - out!!.write((END_CHAR_SEQUENCE + "\n").toByteArray(charset("UTF-8"))) - out!!.flush() - } - - private fun createHelloString(): String { - val sessionId = "" + (Math.random() * Integer.MAX_VALUE).toInt() - return ("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n" - + "<capabilities>\n<capability>urn:ietf:params:netconf:base:1.0</capability>\n" - + "<capability>urn:ietf:params:netconf:base:1.1</capability>\n</capabilities>\n" - + "<session-id>" + sessionId + "</session-id>\n</hello>") - } - }) - - clientThread!!.start() - } - - @Throws(Exception::class) - override fun destroy() { - try { - clientThread!!.join(2000) - } catch (e: InterruptedException) { - // log.warn("Error joining Client thread" + e.getMessage()); - } - - clientThread!!.interrupt() - } - - override fun setInputStream(input: InputStream) { - this.input = input - } - - override fun setOutputStream(out: OutputStream) { - this.out = out - } - - override fun setErrorStream(err: OutputStream) {} - - override fun setExitCallback(callback: ExitCallback) {} - - - - } -}
\ No newline at end of file |