--- /dev/null
+#!/usr/bin/env python\r
+\r
+# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>.\r
+# Use of this source code is governed by MIT license that can be\r
+# found in the LICENSE file.\r
+\r
+import contextlib\r
+import ftplib\r
+import os\r
+import socket\r
+import sys\r
+import ssl\r
+\r
+import OpenSSL # requires "pip install pyopenssl"\r
+\r
+from pyftpdlib.handlers import TLS_FTPHandler\r
+from pyftpdlib.test import close_client\r
+from pyftpdlib.test import configure_logging\r
+from pyftpdlib.test import MProcessTestFTPd\r
+from pyftpdlib.test import OSX\r
+from pyftpdlib.test import PASSWD\r
+from pyftpdlib.test import TestCase\r
+from pyftpdlib.test import TIMEOUT\r
+from pyftpdlib.test import TRAVIS\r
+from pyftpdlib.test import unittest\r
+from pyftpdlib.test import USER\r
+from pyftpdlib.test import VERBOSITY\r
+from pyftpdlib.test.test_functional import TestConfigurableOptions\r
+from pyftpdlib.test.test_functional import TestCornerCases\r
+from pyftpdlib.test.test_functional import TestFtpAbort\r
+from pyftpdlib.test.test_functional import TestFtpAuthentication\r
+from pyftpdlib.test.test_functional import TestFtpCmdsSemantic\r
+from pyftpdlib.test.test_functional import TestFtpDummyCmds\r
+from pyftpdlib.test.test_functional import TestFtpFsOperations\r
+from pyftpdlib.test.test_functional import TestFtpListingCmds\r
+from pyftpdlib.test.test_functional import TestFtpRetrieveData\r
+from pyftpdlib.test.test_functional import TestFtpStoreData\r
+from pyftpdlib.test.test_functional import TestIPv4Environment\r
+from pyftpdlib.test.test_functional import TestIPv6Environment\r
+from pyftpdlib.test.test_functional import TestTimeouts\r
+\r
+\r
+FTPS_SUPPORT = hasattr(ftplib, 'FTP_TLS')\r
+if sys.version_info < (2, 7):\r
+ FTPS_UNSUPPORT_REASON = "requires python 2.7+"\r
+else:\r
+ FTPS_UNSUPPORT_REASON = "FTPS test skipped"\r
+\r
+CERTFILE = os.path.abspath(os.path.join(os.path.dirname(__file__),\r
+ 'keycert.pem'))\r
+\r
+del OpenSSL\r
+\r
+# =====================================================================\r
+# --- FTPS mixin tests\r
+# =====================================================================\r
+\r
+# What we're going to do here is repeat the original functional tests\r
+# defined in test_functinal.py but by using FTPS.\r
+# we secure both control and data connections before running any test.\r
+# This is useful as we reuse the existent functional tests which are\r
+# supposed to work no matter if the underlying protocol is FTP or FTPS.\r
+\r
+\r
+if FTPS_SUPPORT:\r
+ class FTPSClient(ftplib.FTP_TLS):\r
+ """A modified version of ftplib.FTP_TLS class which implicitly\r
+ secure the data connection after login().\r
+ """\r
+\r
+ def login(self, *args, **kwargs):\r
+ ftplib.FTP_TLS.login(self, *args, **kwargs)\r
+ self.prot_p()\r
+\r
+ class FTPSServer(MProcessTestFTPd):\r
+ """A threaded FTPS server used for functional testing."""\r
+ handler = TLS_FTPHandler\r
+ handler.certfile = CERTFILE\r
+\r
+ class TLSTestMixin:\r
+ server_class = FTPSServer\r
+ client_class = FTPSClient\r
+else:\r
+ @unittest.skipIf(True, FTPS_UNSUPPORT_REASON)\r
+ class TLSTestMixin:\r
+ pass\r
+\r
+\r
+class TestFtpAuthenticationTLSMixin(TLSTestMixin, TestFtpAuthentication):\r
+ pass\r
+\r
+\r
+class TestTFtpDummyCmdsTLSMixin(TLSTestMixin, TestFtpDummyCmds):\r
+ pass\r
+\r
+\r
+class TestFtpCmdsSemanticTLSMixin(TLSTestMixin, TestFtpCmdsSemantic):\r
+ pass\r
+\r
+\r
+class TestFtpFsOperationsTLSMixin(TLSTestMixin, TestFtpFsOperations):\r
+ pass\r
+\r
+\r
+class TestFtpStoreDataTLSMixin(TLSTestMixin, TestFtpStoreData):\r
+\r
+ @unittest.skipIf(1, "fails with SSL")\r
+ def test_stou(self):\r
+ pass\r
+\r
+\r
+# class TestSendFileTLSMixin(TLSTestMixin, TestSendfile):\r
+\r
+# def test_fallback(self):\r
+# self.client.prot_c()\r
+# super(TestSendFileTLSMixin, self).test_fallback()\r
+\r
+\r
+class TestFtpRetrieveDataTLSMixin(TLSTestMixin, TestFtpRetrieveData):\r
+\r
+ @unittest.skipIf(os.name == 'nt', "may fail on windows")\r
+ def test_restore_on_retr(self):\r
+ super(TestFtpRetrieveDataTLSMixin, self).test_restore_on_retr()\r
+\r
+\r
+class TestFtpListingCmdsTLSMixin(TLSTestMixin, TestFtpListingCmds):\r
+\r
+ # TODO: see https://travis-ci.org/giampaolo/pyftpdlib/jobs/87318445\r
+ # Fails with:\r
+ # File "/opt/python/2.7.9/lib/python2.7/ftplib.py", line 735, in retrlines\r
+ # conn.unwrap()\r
+ # File "/opt/python/2.7.9/lib/python2.7/ssl.py", line 771, in unwrap\r
+ # s = self._sslobj.shutdown()\r
+ # error: [Errno 0] Error\r
+ @unittest.skipIf(TRAVIS or os.name == 'nt', "may fail on travis/windows")\r
+ def test_nlst(self):\r
+ super(TestFtpListingCmdsTLSMixin, self).test_nlst()\r
+\r
+\r
+class TestFtpAbortTLSMixin(TLSTestMixin, TestFtpAbort):\r
+\r
+ @unittest.skipIf(1, "fails with SSL")\r
+ def test_oob_abor(self):\r
+ pass\r
+\r
+\r
+class TestTimeoutsTLSMixin(TLSTestMixin, TestTimeouts):\r
+\r
+ @unittest.skipIf(1, "fails with SSL")\r
+ def test_data_timeout_not_reached(self):\r
+ pass\r
+\r
+\r
+class TestConfigurableOptionsTLSMixin(TLSTestMixin, TestConfigurableOptions):\r
+ pass\r
+\r
+\r
+class TestIPv4EnvironmentTLSMixin(TLSTestMixin, TestIPv4Environment):\r
+ pass\r
+\r
+\r
+class TestIPv6EnvironmentTLSMixin(TLSTestMixin, TestIPv6Environment):\r
+ pass\r
+\r
+\r
+class TestCornerCasesTLSMixin(TLSTestMixin, TestCornerCases):\r
+ pass\r
+\r
+\r
+# =====================================================================\r
+# dedicated FTPS tests\r
+# =====================================================================\r
+\r
+\r
+@unittest.skipUnless(FTPS_SUPPORT, FTPS_UNSUPPORT_REASON)\r
+class TestFTPS(TestCase):\r
+ """Specific tests fot TSL_FTPHandler class."""\r
+\r
+ def _setup(self,\r
+ tls_control_required=False,\r
+ tls_data_required=False,\r
+ ssl_protocol=ssl.PROTOCOL_SSLv23,\r
+ ):\r
+ self.server = FTPSServer()\r
+ self.server.handler.tls_control_required = tls_control_required\r
+ self.server.handler.tls_data_required = tls_data_required\r
+ self.server.handler.ssl_protocol = ssl_protocol\r
+ self.server.start()\r
+ self.client = ftplib.FTP_TLS(timeout=TIMEOUT)\r
+ self.client.connect(self.server.host, self.server.port)\r
+\r
+ def setUp(self):\r
+ self.client = None\r
+ self.server = None\r
+\r
+ def tearDown(self):\r
+ if self.client is not None:\r
+ self.client.ssl_version = ssl.PROTOCOL_SSLv23\r
+ close_client(self.client)\r
+ if self.server is not None:\r
+ self.server.handler.ssl_protocol = ssl.PROTOCOL_SSLv23\r
+ self.server.handler.tls_control_required = False\r
+ self.server.handler.tls_data_required = False\r
+ self.server.stop()\r
+\r
+ def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs):\r
+ try:\r
+ callableObj(*args, **kwargs)\r
+ except excClass as err:\r
+ if str(err) == msg:\r
+ return\r
+ raise self.failureException("%s != %s" % (str(err), msg))\r
+ else:\r
+ if hasattr(excClass, '__name__'):\r
+ excName = excClass.__name__\r
+ else:\r
+ excName = str(excClass)\r
+ raise self.failureException("%s not raised" % excName)\r
+\r
+ def test_auth(self):\r
+ # unsecured\r
+ self._setup()\r
+ self.client.login(secure=False)\r
+ self.assertFalse(isinstance(self.client.sock, ssl.SSLSocket))\r
+ # secured\r
+ self.client.login()\r
+ self.assertTrue(isinstance(self.client.sock, ssl.SSLSocket))\r
+ # AUTH issued twice\r
+ msg = '503 Already using TLS.'\r
+ self.assertRaisesWithMsg(ftplib.error_perm, msg,\r
+ self.client.sendcmd, 'auth tls')\r
+\r
+ def test_pbsz(self):\r
+ # unsecured\r
+ self._setup()\r
+ self.client.login(secure=False)\r
+ msg = "503 PBSZ not allowed on insecure control connection."\r
+ self.assertRaisesWithMsg(ftplib.error_perm, msg,\r
+ self.client.sendcmd, 'pbsz 0')\r
+ # secured\r
+ self.client.login(secure=True)\r
+ resp = self.client.sendcmd('pbsz 0')\r
+ self.assertEqual(resp, "200 PBSZ=0 successful.")\r
+\r
+ def test_prot(self):\r
+ self._setup()\r
+ self.client.login(secure=False)\r
+ msg = "503 PROT not allowed on insecure control connection."\r
+ self.assertRaisesWithMsg(ftplib.error_perm, msg,\r
+ self.client.sendcmd, 'prot p')\r
+ self.client.login(secure=True)\r
+ # secured\r
+ self.client.prot_p()\r
+ sock = self.client.transfercmd('list')\r
+ with contextlib.closing(sock):\r
+ while True:\r
+ if not sock.recv(1024):\r
+ self.client.voidresp()\r
+ break\r
+ self.assertTrue(isinstance(sock, ssl.SSLSocket))\r
+ # unsecured\r
+ self.client.prot_c()\r
+ sock = self.client.transfercmd('list')\r
+ with contextlib.closing(sock):\r
+ while True:\r
+ if not sock.recv(1024):\r
+ self.client.voidresp()\r
+ break\r
+ self.assertFalse(isinstance(sock, ssl.SSLSocket))\r
+\r
+ def test_feat(self):\r
+ self._setup()\r
+ feat = self.client.sendcmd('feat')\r
+ cmds = ['AUTH TLS', 'AUTH SSL', 'PBSZ', 'PROT']\r
+ for cmd in cmds:\r
+ self.assertTrue(cmd in feat)\r
+\r
+ def test_unforseen_ssl_shutdown(self):\r
+ self._setup()\r
+ self.client.login()\r
+ try:\r
+ sock = self.client.sock.unwrap()\r
+ except socket.error as err:\r
+ if err.errno == 0:\r
+ return\r
+ raise\r
+ sock.settimeout(TIMEOUT)\r
+ sock.sendall(b'noop')\r
+ try:\r
+ chunk = sock.recv(1024)\r
+ except socket.error:\r
+ pass\r
+ else:\r
+ self.assertEqual(chunk, b"")\r
+\r
+ def test_tls_control_required(self):\r
+ self._setup(tls_control_required=True)\r
+ msg = "550 SSL/TLS required on the control channel."\r
+ self.assertRaisesWithMsg(ftplib.error_perm, msg,\r
+ self.client.sendcmd, "user " + USER)\r
+ self.assertRaisesWithMsg(ftplib.error_perm, msg,\r
+ self.client.sendcmd, "pass " + PASSWD)\r
+ self.client.login(secure=True)\r
+\r
+ def test_tls_data_required(self):\r
+ self._setup(tls_data_required=True)\r
+ self.client.login(secure=True)\r
+ msg = "550 SSL/TLS required on the data channel."\r
+ self.assertRaisesWithMsg(ftplib.error_perm, msg,\r
+ self.client.retrlines, 'list', lambda x: x)\r
+ self.client.prot_p()\r
+ self.client.retrlines('list', lambda x: x)\r
+\r
+ def try_protocol_combo(self, server_protocol, client_protocol):\r
+ self._setup(ssl_protocol=server_protocol)\r
+ self.client.ssl_version = client_protocol\r
+ close_client(self.client)\r
+ self.client.connect(self.server.host, self.server.port)\r
+ try:\r
+ self.client.login()\r
+ except (ssl.SSLError, socket.error):\r
+ self.client.close()\r
+ else:\r
+ self.client.quit()\r
+\r
+ # def test_ssl_version(self):\r
+ # protos = [ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,\r
+ # ssl.PROTOCOL_TLSv1]\r
+ # if hasattr(ssl, "PROTOCOL_SSLv2"):\r
+ # protos.append(ssl.PROTOCOL_SSLv2)\r
+ # for proto in protos:\r
+ # self.try_protocol_combo(ssl.PROTOCOL_SSLv2, proto)\r
+ # for proto in protos:\r
+ # self.try_protocol_combo(ssl.PROTOCOL_SSLv3, proto)\r
+ # for proto in protos:\r
+ # self.try_protocol_combo(ssl.PROTOCOL_SSLv23, proto)\r
+ # for proto in protos:\r
+ # self.try_protocol_combo(ssl.PROTOCOL_TLSv1, proto)\r
+\r
+ if hasattr(ssl, "PROTOCOL_SSLv2"):\r
+ def test_sslv2(self):\r
+ self.client.ssl_version = ssl.PROTOCOL_SSLv2\r
+ close_client(self.client)\r
+ if not OSX:\r
+ with self.server.lock:\r
+ self.client.connect(self.server.host, self.server.port)\r
+ self.assertRaises(socket.error, self.client.login)\r
+ else:\r
+ with self.server.lock:\r
+ with self.assertRaises(socket.error):\r
+ self.client.connect(self.server.host, self.server.port,\r
+ timeout=0.1)\r
+ self.client.ssl_version = ssl.PROTOCOL_SSLv2\r
+\r
+\r
+configure_logging()\r
+\r
+\r
+if __name__ == '__main__':\r
+ unittest.main(verbosity=VERBOSITY)\r