X-FTP
[x93.git/.git] / xftp.git / ext / pyftpdlib / test / test_functional_ssl.py
diff --git a/xftp.git/ext/pyftpdlib/test/test_functional_ssl.py b/xftp.git/ext/pyftpdlib/test/test_functional_ssl.py
new file mode 100644 (file)
index 0000000..bff7501
--- /dev/null
@@ -0,0 +1,360 @@
+#!/usr/bin/env python\r
+\r
+# Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>.\r
+# Use of this source code is governed by MIT license that can be\r
+# found in the LICENSE file.\r
+\r
+import contextlib\r
+import ftplib\r
+import os\r
+import socket\r
+import sys\r
+import ssl\r
+\r
+import OpenSSL  # requires "pip install pyopenssl"\r
+\r
+from pyftpdlib.handlers import TLS_FTPHandler\r
+from pyftpdlib.test import close_client\r
+from pyftpdlib.test import configure_logging\r
+from pyftpdlib.test import MProcessTestFTPd\r
+from pyftpdlib.test import OSX\r
+from pyftpdlib.test import PASSWD\r
+from pyftpdlib.test import TestCase\r
+from pyftpdlib.test import TIMEOUT\r
+from pyftpdlib.test import TRAVIS\r
+from pyftpdlib.test import unittest\r
+from pyftpdlib.test import USER\r
+from pyftpdlib.test import VERBOSITY\r
+from pyftpdlib.test.test_functional import TestConfigurableOptions\r
+from pyftpdlib.test.test_functional import TestCornerCases\r
+from pyftpdlib.test.test_functional import TestFtpAbort\r
+from pyftpdlib.test.test_functional import TestFtpAuthentication\r
+from pyftpdlib.test.test_functional import TestFtpCmdsSemantic\r
+from pyftpdlib.test.test_functional import TestFtpDummyCmds\r
+from pyftpdlib.test.test_functional import TestFtpFsOperations\r
+from pyftpdlib.test.test_functional import TestFtpListingCmds\r
+from pyftpdlib.test.test_functional import TestFtpRetrieveData\r
+from pyftpdlib.test.test_functional import TestFtpStoreData\r
+from pyftpdlib.test.test_functional import TestIPv4Environment\r
+from pyftpdlib.test.test_functional import TestIPv6Environment\r
+from pyftpdlib.test.test_functional import TestTimeouts\r
+\r
+\r
+FTPS_SUPPORT = hasattr(ftplib, 'FTP_TLS')\r
+if sys.version_info < (2, 7):\r
+    FTPS_UNSUPPORT_REASON = "requires python 2.7+"\r
+else:\r
+    FTPS_UNSUPPORT_REASON = "FTPS test skipped"\r
+\r
+CERTFILE = os.path.abspath(os.path.join(os.path.dirname(__file__),\r
+                                        'keycert.pem'))\r
+\r
+del OpenSSL\r
+\r
+# =====================================================================\r
+# --- FTPS mixin tests\r
+# =====================================================================\r
+\r
+# What we're going to do here is repeat the original functional tests\r
+# defined in test_functinal.py but by using FTPS.\r
+# we secure both control and data connections before running any test.\r
+# This is useful as we reuse the existent functional tests which are\r
+# supposed to work no matter if the underlying protocol is FTP or FTPS.\r
+\r
+\r
+if FTPS_SUPPORT:\r
+    class FTPSClient(ftplib.FTP_TLS):\r
+        """A modified version of ftplib.FTP_TLS class which implicitly\r
+        secure the data connection after login().\r
+        """\r
+\r
+        def login(self, *args, **kwargs):\r
+            ftplib.FTP_TLS.login(self, *args, **kwargs)\r
+            self.prot_p()\r
+\r
+    class FTPSServer(MProcessTestFTPd):\r
+        """A threaded FTPS server used for functional testing."""\r
+        handler = TLS_FTPHandler\r
+        handler.certfile = CERTFILE\r
+\r
+    class TLSTestMixin:\r
+        server_class = FTPSServer\r
+        client_class = FTPSClient\r
+else:\r
+    @unittest.skipIf(True, FTPS_UNSUPPORT_REASON)\r
+    class TLSTestMixin:\r
+        pass\r
+\r
+\r
+class TestFtpAuthenticationTLSMixin(TLSTestMixin, TestFtpAuthentication):\r
+    pass\r
+\r
+\r
+class TestTFtpDummyCmdsTLSMixin(TLSTestMixin, TestFtpDummyCmds):\r
+    pass\r
+\r
+\r
+class TestFtpCmdsSemanticTLSMixin(TLSTestMixin, TestFtpCmdsSemantic):\r
+    pass\r
+\r
+\r
+class TestFtpFsOperationsTLSMixin(TLSTestMixin, TestFtpFsOperations):\r
+    pass\r
+\r
+\r
+class TestFtpStoreDataTLSMixin(TLSTestMixin, TestFtpStoreData):\r
+\r
+    @unittest.skipIf(1, "fails with SSL")\r
+    def test_stou(self):\r
+        pass\r
+\r
+\r
+# class TestSendFileTLSMixin(TLSTestMixin, TestSendfile):\r
+\r
+#     def test_fallback(self):\r
+#         self.client.prot_c()\r
+#         super(TestSendFileTLSMixin, self).test_fallback()\r
+\r
+\r
+class TestFtpRetrieveDataTLSMixin(TLSTestMixin, TestFtpRetrieveData):\r
+\r
+    @unittest.skipIf(os.name == 'nt', "may fail on windows")\r
+    def test_restore_on_retr(self):\r
+        super(TestFtpRetrieveDataTLSMixin, self).test_restore_on_retr()\r
+\r
+\r
+class TestFtpListingCmdsTLSMixin(TLSTestMixin, TestFtpListingCmds):\r
+\r
+    # TODO: see https://travis-ci.org/giampaolo/pyftpdlib/jobs/87318445\r
+    # Fails with:\r
+    # File "/opt/python/2.7.9/lib/python2.7/ftplib.py", line 735, in retrlines\r
+    #    conn.unwrap()\r
+    # File "/opt/python/2.7.9/lib/python2.7/ssl.py", line 771, in unwrap\r
+    #    s = self._sslobj.shutdown()\r
+    # error: [Errno 0] Error\r
+    @unittest.skipIf(TRAVIS or os.name == 'nt', "may fail on travis/windows")\r
+    def test_nlst(self):\r
+        super(TestFtpListingCmdsTLSMixin, self).test_nlst()\r
+\r
+\r
+class TestFtpAbortTLSMixin(TLSTestMixin, TestFtpAbort):\r
+\r
+    @unittest.skipIf(1, "fails with SSL")\r
+    def test_oob_abor(self):\r
+        pass\r
+\r
+\r
+class TestTimeoutsTLSMixin(TLSTestMixin, TestTimeouts):\r
+\r
+    @unittest.skipIf(1, "fails with SSL")\r
+    def test_data_timeout_not_reached(self):\r
+        pass\r
+\r
+\r
+class TestConfigurableOptionsTLSMixin(TLSTestMixin, TestConfigurableOptions):\r
+    pass\r
+\r
+\r
+class TestIPv4EnvironmentTLSMixin(TLSTestMixin, TestIPv4Environment):\r
+    pass\r
+\r
+\r
+class TestIPv6EnvironmentTLSMixin(TLSTestMixin, TestIPv6Environment):\r
+    pass\r
+\r
+\r
+class TestCornerCasesTLSMixin(TLSTestMixin, TestCornerCases):\r
+    pass\r
+\r
+\r
+# =====================================================================\r
+# dedicated FTPS tests\r
+# =====================================================================\r
+\r
+\r
+@unittest.skipUnless(FTPS_SUPPORT, FTPS_UNSUPPORT_REASON)\r
+class TestFTPS(TestCase):\r
+    """Specific tests fot TSL_FTPHandler class."""\r
+\r
+    def _setup(self,\r
+               tls_control_required=False,\r
+               tls_data_required=False,\r
+               ssl_protocol=ssl.PROTOCOL_SSLv23,\r
+               ):\r
+        self.server = FTPSServer()\r
+        self.server.handler.tls_control_required = tls_control_required\r
+        self.server.handler.tls_data_required = tls_data_required\r
+        self.server.handler.ssl_protocol = ssl_protocol\r
+        self.server.start()\r
+        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)\r
+        self.client.connect(self.server.host, self.server.port)\r
+\r
+    def setUp(self):\r
+        self.client = None\r
+        self.server = None\r
+\r
+    def tearDown(self):\r
+        if self.client is not None:\r
+            self.client.ssl_version = ssl.PROTOCOL_SSLv23\r
+            close_client(self.client)\r
+        if self.server is not None:\r
+            self.server.handler.ssl_protocol = ssl.PROTOCOL_SSLv23\r
+            self.server.handler.tls_control_required = False\r
+            self.server.handler.tls_data_required = False\r
+            self.server.stop()\r
+\r
+    def assertRaisesWithMsg(self, excClass, msg, callableObj, *args, **kwargs):\r
+        try:\r
+            callableObj(*args, **kwargs)\r
+        except excClass as err:\r
+            if str(err) == msg:\r
+                return\r
+            raise self.failureException("%s != %s" % (str(err), msg))\r
+        else:\r
+            if hasattr(excClass, '__name__'):\r
+                excName = excClass.__name__\r
+            else:\r
+                excName = str(excClass)\r
+            raise self.failureException("%s not raised" % excName)\r
+\r
+    def test_auth(self):\r
+        # unsecured\r
+        self._setup()\r
+        self.client.login(secure=False)\r
+        self.assertFalse(isinstance(self.client.sock, ssl.SSLSocket))\r
+        # secured\r
+        self.client.login()\r
+        self.assertTrue(isinstance(self.client.sock, ssl.SSLSocket))\r
+        # AUTH issued twice\r
+        msg = '503 Already using TLS.'\r
+        self.assertRaisesWithMsg(ftplib.error_perm, msg,\r
+                                 self.client.sendcmd, 'auth tls')\r
+\r
+    def test_pbsz(self):\r
+        # unsecured\r
+        self._setup()\r
+        self.client.login(secure=False)\r
+        msg = "503 PBSZ not allowed on insecure control connection."\r
+        self.assertRaisesWithMsg(ftplib.error_perm, msg,\r
+                                 self.client.sendcmd, 'pbsz 0')\r
+        # secured\r
+        self.client.login(secure=True)\r
+        resp = self.client.sendcmd('pbsz 0')\r
+        self.assertEqual(resp, "200 PBSZ=0 successful.")\r
+\r
+    def test_prot(self):\r
+        self._setup()\r
+        self.client.login(secure=False)\r
+        msg = "503 PROT not allowed on insecure control connection."\r
+        self.assertRaisesWithMsg(ftplib.error_perm, msg,\r
+                                 self.client.sendcmd, 'prot p')\r
+        self.client.login(secure=True)\r
+        # secured\r
+        self.client.prot_p()\r
+        sock = self.client.transfercmd('list')\r
+        with contextlib.closing(sock):\r
+            while True:\r
+                if not sock.recv(1024):\r
+                    self.client.voidresp()\r
+                    break\r
+            self.assertTrue(isinstance(sock, ssl.SSLSocket))\r
+            # unsecured\r
+            self.client.prot_c()\r
+        sock = self.client.transfercmd('list')\r
+        with contextlib.closing(sock):\r
+            while True:\r
+                if not sock.recv(1024):\r
+                    self.client.voidresp()\r
+                    break\r
+            self.assertFalse(isinstance(sock, ssl.SSLSocket))\r
+\r
+    def test_feat(self):\r
+        self._setup()\r
+        feat = self.client.sendcmd('feat')\r
+        cmds = ['AUTH TLS', 'AUTH SSL', 'PBSZ', 'PROT']\r
+        for cmd in cmds:\r
+            self.assertTrue(cmd in feat)\r
+\r
+    def test_unforseen_ssl_shutdown(self):\r
+        self._setup()\r
+        self.client.login()\r
+        try:\r
+            sock = self.client.sock.unwrap()\r
+        except socket.error as err:\r
+            if err.errno == 0:\r
+                return\r
+            raise\r
+        sock.settimeout(TIMEOUT)\r
+        sock.sendall(b'noop')\r
+        try:\r
+            chunk = sock.recv(1024)\r
+        except socket.error:\r
+            pass\r
+        else:\r
+            self.assertEqual(chunk, b"")\r
+\r
+    def test_tls_control_required(self):\r
+        self._setup(tls_control_required=True)\r
+        msg = "550 SSL/TLS required on the control channel."\r
+        self.assertRaisesWithMsg(ftplib.error_perm, msg,\r
+                                 self.client.sendcmd, "user " + USER)\r
+        self.assertRaisesWithMsg(ftplib.error_perm, msg,\r
+                                 self.client.sendcmd, "pass " + PASSWD)\r
+        self.client.login(secure=True)\r
+\r
+    def test_tls_data_required(self):\r
+        self._setup(tls_data_required=True)\r
+        self.client.login(secure=True)\r
+        msg = "550 SSL/TLS required on the data channel."\r
+        self.assertRaisesWithMsg(ftplib.error_perm, msg,\r
+                                 self.client.retrlines, 'list', lambda x: x)\r
+        self.client.prot_p()\r
+        self.client.retrlines('list', lambda x: x)\r
+\r
+    def try_protocol_combo(self, server_protocol, client_protocol):\r
+        self._setup(ssl_protocol=server_protocol)\r
+        self.client.ssl_version = client_protocol\r
+        close_client(self.client)\r
+        self.client.connect(self.server.host, self.server.port)\r
+        try:\r
+            self.client.login()\r
+        except (ssl.SSLError, socket.error):\r
+            self.client.close()\r
+        else:\r
+            self.client.quit()\r
+\r
+    # def test_ssl_version(self):\r
+    #     protos = [ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,\r
+    #               ssl.PROTOCOL_TLSv1]\r
+    #     if hasattr(ssl, "PROTOCOL_SSLv2"):\r
+    #         protos.append(ssl.PROTOCOL_SSLv2)\r
+    #         for proto in protos:\r
+    #             self.try_protocol_combo(ssl.PROTOCOL_SSLv2, proto)\r
+    #     for proto in protos:\r
+    #         self.try_protocol_combo(ssl.PROTOCOL_SSLv3, proto)\r
+    #     for proto in protos:\r
+    #         self.try_protocol_combo(ssl.PROTOCOL_SSLv23, proto)\r
+    #     for proto in protos:\r
+    #         self.try_protocol_combo(ssl.PROTOCOL_TLSv1, proto)\r
+\r
+    if hasattr(ssl, "PROTOCOL_SSLv2"):\r
+        def test_sslv2(self):\r
+            self.client.ssl_version = ssl.PROTOCOL_SSLv2\r
+            close_client(self.client)\r
+            if not OSX:\r
+                with self.server.lock:\r
+                    self.client.connect(self.server.host, self.server.port)\r
+                self.assertRaises(socket.error, self.client.login)\r
+            else:\r
+                with self.server.lock:\r
+                    with self.assertRaises(socket.error):\r
+                        self.client.connect(self.server.host, self.server.port,\r
+                                            timeout=0.1)\r
+            self.client.ssl_version = ssl.PROTOCOL_SSLv2\r
+\r
+\r
+configure_logging()\r
+\r
+\r
+if __name__ == '__main__':\r
+    unittest.main(verbosity=VERBOSITY)\r