X-FTP
[x93.git/.git] / xftp.git / ext / pyftpdlib / test / test_ioloop.py
1 #!/usr/bin/env python\r
2 \r
3 # Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>.\r
4 # Use of this source code is governed by MIT license that can be\r
5 # found in the LICENSE file.\r
6 \r
7 import contextlib\r
8 import errno\r
9 import select\r
10 import socket\r
11 import sys\r
12 import time\r
13 \r
14 from pyftpdlib._compat import PY3\r
15 from pyftpdlib.ioloop import Acceptor\r
16 from pyftpdlib.ioloop import AsyncChat\r
17 from pyftpdlib.ioloop import IOLoop\r
18 from pyftpdlib.ioloop import RetryError\r
19 from pyftpdlib.test import mock\r
20 from pyftpdlib.test import POSIX\r
21 from pyftpdlib.test import TestCase\r
22 from pyftpdlib.test import unittest\r
23 from pyftpdlib.test import VERBOSITY\r
24 import pyftpdlib.ioloop\r
25 \r
26 \r
27 if hasattr(socket, 'socketpair'):\r
28     socketpair = socket.socketpair\r
29 else:\r
30     def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):\r
31         with contextlib.closing(socket.socket(family, type, proto)) as l:\r
32             l.bind(("localhost", 0))\r
33             l.listen(5)\r
34             c = socket.socket(family, type, proto)\r
35             try:\r
36                 c.connect(l.getsockname())\r
37                 caddr = c.getsockname()\r
38                 while True:\r
39                     a, addr = l.accept()\r
40                     # check that we've got the correct client\r
41                     if addr == caddr:\r
42                         return c, a\r
43                     a.close()\r
44             except OSError:\r
45                 c.close()\r
46                 raise\r
47 \r
48 \r
49 # TODO: write more tests.\r
50 class BaseIOLoopTestCase(object):\r
51 \r
52     ioloop_class = None\r
53 \r
54     def make_socketpair(self):\r
55         rd, wr = socketpair()\r
56         self.addCleanup(rd.close)\r
57         self.addCleanup(wr.close)\r
58         return rd, wr\r
59 \r
60     def test_register(self):\r
61         s = self.ioloop_class()\r
62         self.addCleanup(s.close)\r
63         rd, wr = self.make_socketpair()\r
64         handler = AsyncChat(rd)\r
65         s.register(rd, handler, s.READ)\r
66         s.register(wr, handler, s.WRITE)\r
67         self.assertIn(rd, s.socket_map)\r
68         self.assertIn(wr, s.socket_map)\r
69         return (s, rd, wr)\r
70 \r
71     def test_unregister(self):\r
72         s, rd, wr = self.test_register()\r
73         s.unregister(rd)\r
74         s.unregister(wr)\r
75         self.assertNotIn(rd, s.socket_map)\r
76         self.assertNotIn(wr, s.socket_map)\r
77 \r
78     def test_unregister_twice(self):\r
79         s, rd, wr = self.test_register()\r
80         s.unregister(rd)\r
81         s.unregister(rd)\r
82         s.unregister(wr)\r
83         s.unregister(wr)\r
84 \r
85     def test_modify(self):\r
86         s, rd, wr = self.test_register()\r
87         s.modify(rd, s.WRITE)\r
88         s.modify(wr, s.READ)\r
89 \r
90     def test_loop(self):\r
91         # no timeout\r
92         s, rd, wr = self.test_register()\r
93         s.call_later(0, s.close)\r
94         s.loop()\r
95         # with timeout\r
96         s, rd, wr = self.test_register()\r
97         s.call_later(0, s.close)\r
98         s.loop(timeout=0.001)\r
99 \r
100     def test_close(self):\r
101         s, rd, wr = self.test_register()\r
102         s.close()\r
103         self.assertEqual(s.socket_map, {})\r
104 \r
105     def test_close_w_handler_exc(self):\r
106         # Simulate an exception when close()ing a socket handler.\r
107         # Exception should be logged and ignored.\r
108         class Handler(AsyncChat):\r
109 \r
110             def close(self):\r
111                 1 / 0\r
112 \r
113         s = self.ioloop_class()\r
114         self.addCleanup(s.close)\r
115         rd, wr = self.make_socketpair()\r
116         handler = Handler(rd)\r
117         s.register(rd, handler, s.READ)\r
118         with mock.patch("pyftpdlib.ioloop.logger.error") as m:\r
119             s.close()\r
120             assert m.called\r
121             self.assertIn('ZeroDivisionError', m.call_args[0][0])\r
122 \r
123     def test_close_w_handler_ebadf_exc(self):\r
124         # Simulate an exception when close()ing a socket handler.\r
125         # Exception should be ignored (and not logged).\r
126         class Handler(AsyncChat):\r
127 \r
128             def close(self):\r
129                 raise OSError(errno.EBADF, "")\r
130 \r
131         s = self.ioloop_class()\r
132         self.addCleanup(s.close)\r
133         rd, wr = self.make_socketpair()\r
134         handler = Handler(rd)\r
135         s.register(rd, handler, s.READ)\r
136         with mock.patch("pyftpdlib.ioloop.logger.error") as m:\r
137             s.close()\r
138             assert not m.called\r
139 \r
140     def test_close_w_callback_exc(self):\r
141         # Simulate an exception when close()ing the IO loop and a\r
142         # scheduled callback raises an exception on cancel().\r
143         with mock.patch("pyftpdlib.ioloop.logger.error") as logerr:\r
144             with mock.patch("pyftpdlib.ioloop._CallLater.cancel",\r
145                             side_effect=lambda: 1 / 0) as cancel:\r
146                 s = self.ioloop_class()\r
147                 self.addCleanup(s.close)\r
148                 s.call_later(1, lambda: 0)\r
149                 s.close()\r
150                 assert cancel.called\r
151                 assert logerr.called\r
152                 self.assertIn('ZeroDivisionError', logerr.call_args[0][0])\r
153 \r
154 \r
155 class DefaultIOLoopTestCase(TestCase, BaseIOLoopTestCase):\r
156     ioloop_class = pyftpdlib.ioloop.IOLoop\r
157 \r
158 \r
159 # ===================================================================\r
160 # select()\r
161 # ===================================================================\r
162 \r
163 class SelectIOLoopTestCase(TestCase, BaseIOLoopTestCase):\r
164     ioloop_class = pyftpdlib.ioloop.Select\r
165 \r
166     def test_select_eintr(self):\r
167         # EINTR is supposed to be ignored\r
168         with mock.patch('pyftpdlib.ioloop.select.select',\r
169                         side_effect=select.error()) as m:\r
170             m.side_effect.errno = errno.EINTR\r
171             s, rd, wr = self.test_register()\r
172             s.poll(0)\r
173         # ...but just that\r
174         with mock.patch('pyftpdlib.ioloop.select.select',\r
175                         side_effect=select.error()) as m:\r
176             m.side_effect.errno = errno.EBADF\r
177             s, rd, wr = self.test_register()\r
178             self.assertRaises(select.error, s.poll, 0)\r
179 \r
180 \r
181 # ===================================================================\r
182 # poll()\r
183 # ===================================================================\r
184 \r
185 @unittest.skipUnless(hasattr(pyftpdlib.ioloop, 'Poll'),\r
186                      "poll() not available on this platform")\r
187 class PollIOLoopTestCase(TestCase, BaseIOLoopTestCase):\r
188     ioloop_class = getattr(pyftpdlib.ioloop, "Poll", None)\r
189     poller_mock = "pyftpdlib.ioloop.Poll._poller"\r
190 \r
191     @unittest.skipIf(sys.version_info[:2] == (3, 2), "")\r
192     def test_eintr_on_poll(self):\r
193         # EINTR is supposed to be ignored\r
194         with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:\r
195             if not PY3:\r
196                 m.return_value.poll.side_effect = select.error\r
197                 m.return_value.poll.side_effect.errno = errno.EINTR\r
198             else:\r
199                 m.return_value.poll.side_effect = OSError(errno.EINTR, "")\r
200             s, rd, wr = self.test_register()\r
201             s.poll(0)\r
202             assert m.called\r
203         # ...but just that\r
204         with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:\r
205             if not PY3:\r
206                 m.return_value.poll.side_effect = select.error\r
207                 m.return_value.poll.side_effect.errno = errno.EBADF\r
208             else:\r
209                 m.return_value.poll.side_effect = OSError(errno.EBADF, "")\r
210             s, rd, wr = self.test_register()\r
211             self.assertRaises(select.error, s.poll, 0)\r
212             assert m.called\r
213 \r
214     def test_eexist_on_register(self):\r
215         # EEXIST is supposed to be ignored\r
216         with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:\r
217             m.return_value.register.side_effect = \\r
218                 EnvironmentError(errno.EEXIST, "")\r
219             s, rd, wr = self.test_register()\r
220         # ...but just that\r
221         with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:\r
222             m.return_value.register.side_effect = \\r
223                 EnvironmentError(errno.EBADF, "")\r
224             self.assertRaises(EnvironmentError, self.test_register)\r
225 \r
226     def test_enoent_ebadf_on_unregister(self):\r
227         # ENOENT and EBADF are supposed to be ignored\r
228         for errnum in (errno.EBADF, errno.ENOENT):\r
229             with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:\r
230                 m.return_value.unregister.side_effect = \\r
231                     EnvironmentError(errnum, "")\r
232                 s, rd, wr = self.test_register()\r
233                 s.unregister(rd)\r
234         # ...but just those\r
235         with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:\r
236             m.return_value.unregister.side_effect = \\r
237                 EnvironmentError(errno.EEXIST, "")\r
238             s, rd, wr = self.test_register()\r
239             self.assertRaises(EnvironmentError, s.unregister, rd)\r
240 \r
241     def test_enoent_on_modify(self):\r
242         # ENOENT is supposed to be ignored\r
243         with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:\r
244             m.return_value.modify.side_effect = \\r
245                 OSError(errno.ENOENT, "")\r
246             s, rd, wr = self.test_register()\r
247             s.modify(rd, s.READ)\r
248 \r
249 \r
250 # ===================================================================\r
251 # epoll()\r
252 # ===================================================================\r
253 \r
254 @unittest.skipUnless(hasattr(pyftpdlib.ioloop, 'Epoll'),\r
255                      "epoll() not available on this platform (Linux only)")\r
256 class EpollIOLoopTestCase(PollIOLoopTestCase):\r
257     ioloop_class = getattr(pyftpdlib.ioloop, "Epoll", None)\r
258     poller_mock = "pyftpdlib.ioloop.Epoll._poller"\r
259 \r
260 \r
261 # ===================================================================\r
262 # /dev/poll\r
263 # ===================================================================\r
264 \r
265 @unittest.skipUnless(hasattr(pyftpdlib.ioloop, 'DevPoll'),\r
266                      "/dev/poll not available on this platform (Solaris only)")\r
267 class DevPollIOLoopTestCase(TestCase, BaseIOLoopTestCase):\r
268     ioloop_class = getattr(pyftpdlib.ioloop, "DevPoll", None)\r
269 \r
270 \r
271 # ===================================================================\r
272 # kqueue\r
273 # ===================================================================\r
274 \r
275 @unittest.skipUnless(hasattr(pyftpdlib.ioloop, 'Kqueue'),\r
276                      "/dev/poll not available on this platform (BSD only)")\r
277 class KqueueIOLoopTestCase(TestCase, BaseIOLoopTestCase):\r
278     ioloop_class = getattr(pyftpdlib.ioloop, "Kqueue", None)\r
279 \r
280 \r
281 class TestCallLater(TestCase):\r
282     """Tests for CallLater class."""\r
283 \r
284     def setUp(self):\r
285         self.ioloop = IOLoop.instance()\r
286         for task in self.ioloop.sched._tasks:\r
287             if not task.cancelled:\r
288                 task.cancel()\r
289         del self.ioloop.sched._tasks[:]\r
290 \r
291     def scheduler(self, timeout=0.01, count=100):\r
292         while self.ioloop.sched._tasks and count > 0:\r
293             self.ioloop.sched.poll()\r
294             count -= 1\r
295             time.sleep(timeout)\r
296 \r
297     def test_interface(self):\r
298         def fun():\r
299             return 0\r
300 \r
301         self.assertRaises(AssertionError, self.ioloop.call_later, -1, fun)\r
302         x = self.ioloop.call_later(3, fun)\r
303         self.assertEqual(x.cancelled, False)\r
304         x.cancel()\r
305         self.assertEqual(x.cancelled, True)\r
306         self.assertRaises(AssertionError, x.call)\r
307         self.assertRaises(AssertionError, x.reset)\r
308         x.cancel()\r
309 \r
310     def test_order(self):\r
311         def fun(x):\r
312             l.append(x)\r
313 \r
314         l = []\r
315         for x in [0.05, 0.04, 0.03, 0.02, 0.01]:\r
316             self.ioloop.call_later(x, fun, x)\r
317         self.scheduler()\r
318         self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])\r
319 \r
320     # The test is reliable only on those systems where time.time()\r
321     # provides time with a better precision than 1 second.\r
322     if not str(time.time()).endswith('.0'):\r
323         def test_reset(self):\r
324             def fun(x):\r
325                 l.append(x)\r
326 \r
327             l = []\r
328             self.ioloop.call_later(0.01, fun, 0.01)\r
329             self.ioloop.call_later(0.02, fun, 0.02)\r
330             self.ioloop.call_later(0.03, fun, 0.03)\r
331             x = self.ioloop.call_later(0.04, fun, 0.04)\r
332             self.ioloop.call_later(0.05, fun, 0.05)\r
333             time.sleep(0.1)\r
334             x.reset()\r
335             self.scheduler()\r
336             self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04])\r
337 \r
338     def test_cancel(self):\r
339         def fun(x):\r
340             l.append(x)\r
341 \r
342         l = []\r
343         self.ioloop.call_later(0.01, fun, 0.01).cancel()\r
344         self.ioloop.call_later(0.02, fun, 0.02)\r
345         self.ioloop.call_later(0.03, fun, 0.03)\r
346         self.ioloop.call_later(0.04, fun, 0.04)\r
347         self.ioloop.call_later(0.05, fun, 0.05).cancel()\r
348         self.scheduler()\r
349         self.assertEqual(l, [0.02, 0.03, 0.04])\r
350 \r
351     def test_errback(self):\r
352         l = []\r
353         self.ioloop.call_later(\r
354             0.0, lambda: 1 // 0, _errback=lambda: l.append(True))\r
355         self.scheduler()\r
356         self.assertEqual(l, [True])\r
357 \r
358     def test__repr__(self):\r
359         repr(self.ioloop.call_later(0.01, lambda: 0, 0.01))\r
360 \r
361     def test__lt__(self):\r
362         a = self.ioloop.call_later(0.01, lambda: 0, 0.01)\r
363         b = self.ioloop.call_later(0.02, lambda: 0, 0.02)\r
364         self.assertTrue(a < b)\r
365 \r
366     def test__le__(self):\r
367         a = self.ioloop.call_later(0.01, lambda: 0, 0.01)\r
368         b = self.ioloop.call_later(0.02, lambda: 0, 0.02)\r
369         self.assertTrue(a <= b)\r
370 \r
371 \r
372 class TestCallEvery(TestCase):\r
373     """Tests for CallEvery class."""\r
374 \r
375     def setUp(self):\r
376         self.ioloop = IOLoop.instance()\r
377         for task in self.ioloop.sched._tasks:\r
378             if not task.cancelled:\r
379                 task.cancel()\r
380         del self.ioloop.sched._tasks[:]\r
381 \r
382     def scheduler(self, timeout=0.003):\r
383         stop_at = time.time() + timeout\r
384         while time.time() < stop_at:\r
385             self.ioloop.sched.poll()\r
386 \r
387     def test_interface(self):\r
388         def fun():\r
389             return 0\r
390 \r
391         self.assertRaises(AssertionError, self.ioloop.call_every, -1, fun)\r
392         x = self.ioloop.call_every(3, fun)\r
393         self.assertEqual(x.cancelled, False)\r
394         x.cancel()\r
395         self.assertEqual(x.cancelled, True)\r
396         self.assertRaises(AssertionError, x.call)\r
397         self.assertRaises(AssertionError, x.reset)\r
398         x.cancel()\r
399 \r
400     def test_only_once(self):\r
401         # make sure that callback is called only once per-loop\r
402         def fun():\r
403             l1.append(None)\r
404 \r
405         l1 = []\r
406         self.ioloop.call_every(0, fun)\r
407         self.ioloop.sched.poll()\r
408         self.assertEqual(l1, [None])\r
409 \r
410     def test_multi_0_timeout(self):\r
411         # make sure a 0 timeout callback is called as many times\r
412         # as the number of loops\r
413         def fun():\r
414             l.append(None)\r
415 \r
416         l = []\r
417         self.ioloop.call_every(0, fun)\r
418         for x in range(100):\r
419             self.ioloop.sched.poll()\r
420         self.assertEqual(len(l), 100)\r
421 \r
422     # run it on systems where time.time() has a higher precision\r
423     if POSIX:\r
424         def test_low_and_high_timeouts(self):\r
425             # make sure a callback with a lower timeout is called more\r
426             # frequently than another with a greater timeout\r
427             def fun():\r
428                 l1.append(None)\r
429 \r
430             l1 = []\r
431             self.ioloop.call_every(0.001, fun)\r
432             self.scheduler()\r
433 \r
434             def fun():\r
435                 l2.append(None)\r
436 \r
437             l2 = []\r
438             self.ioloop.call_every(0.005, fun)\r
439             self.scheduler(timeout=0.01)\r
440 \r
441             self.assertTrue(len(l1) > len(l2))\r
442 \r
443     def test_cancel(self):\r
444         # make sure a cancelled callback doesn't get called anymore\r
445         def fun():\r
446             l.append(None)\r
447 \r
448         l = []\r
449         call = self.ioloop.call_every(0.001, fun)\r
450         self.scheduler()\r
451         len_l = len(l)\r
452         call.cancel()\r
453         self.scheduler()\r
454         self.assertEqual(len_l, len(l))\r
455 \r
456     def test_errback(self):\r
457         l = []\r
458         self.ioloop.call_every(\r
459             0.0, lambda: 1 // 0, _errback=lambda: l.append(True))\r
460         self.scheduler()\r
461         self.assertTrue(l)\r
462 \r
463 \r
464 class TestAsyncChat(TestCase):\r
465 \r
466     def get_connected_handler(self):\r
467         s = socket.socket()\r
468         self.addCleanup(s.close)\r
469         ac = AsyncChat(sock=s)\r
470         self.addCleanup(ac.close)\r
471         return ac\r
472 \r
473     def test_send_retry(self):\r
474         ac = self.get_connected_handler()\r
475         for errnum in pyftpdlib.ioloop._ERRNOS_RETRY:\r
476             with mock.patch("pyftpdlib.ioloop.socket.socket.send",\r
477                             side_effect=socket.error(errnum, "")) as m:\r
478                 self.assertEqual(ac.send(b"x"), 0)\r
479                 assert m.called\r
480 \r
481     def test_send_disconnect(self):\r
482         ac = self.get_connected_handler()\r
483         for errnum in pyftpdlib.ioloop._ERRNOS_DISCONNECTED:\r
484             with mock.patch("pyftpdlib.ioloop.socket.socket.send",\r
485                             side_effect=socket.error(errnum, "")) as send:\r
486                 with mock.patch.object(ac, "handle_close") as handle_close:\r
487                     self.assertEqual(ac.send(b"x"), 0)\r
488                     assert send.called\r
489                     assert handle_close.called\r
490 \r
491     def test_recv_retry(self):\r
492         ac = self.get_connected_handler()\r
493         for errnum in pyftpdlib.ioloop._ERRNOS_RETRY:\r
494             with mock.patch("pyftpdlib.ioloop.socket.socket.recv",\r
495                             side_effect=socket.error(errnum, "")) as m:\r
496                 self.assertRaises(RetryError, ac.recv, 1024)\r
497                 assert m.called\r
498 \r
499     def test_recv_disconnect(self):\r
500         ac = self.get_connected_handler()\r
501         for errnum in pyftpdlib.ioloop._ERRNOS_DISCONNECTED:\r
502             with mock.patch("pyftpdlib.ioloop.socket.socket.recv",\r
503                             side_effect=socket.error(errnum, "")) as send:\r
504                 with mock.patch.object(ac, "handle_close") as handle_close:\r
505                     self.assertEqual(ac.recv(b"x"), b'')\r
506                     assert send.called\r
507                     assert handle_close.called\r
508 \r
509     def test_connect_af_unspecified_err(self):\r
510         ac = AsyncChat()\r
511         with mock.patch.object(\r
512                 ac, "connect",\r
513                 side_effect=socket.error(errno.EBADF, "")) as m:\r
514             self.assertRaises(socket.error,\r
515                               ac.connect_af_unspecified, ("localhost", 0))\r
516             assert m.called\r
517             self.assertIsNone(ac.socket)\r
518 \r
519 \r
520 class TestAcceptor(TestCase):\r
521 \r
522     def test_bind_af_unspecified_err(self):\r
523         ac = Acceptor()\r
524         with mock.patch.object(\r
525                 ac, "bind",\r
526                 side_effect=socket.error(errno.EBADF, "")) as m:\r
527             self.assertRaises(socket.error,\r
528                               ac.bind_af_unspecified, ("localhost", 0))\r
529             assert m.called\r
530             self.assertIsNone(ac.socket)\r
531 \r
532     def test_handle_accept_econnacorted(self):\r
533         # https://github.com/giampaolo/pyftpdlib/issues/105\r
534         ac = Acceptor()\r
535         with mock.patch.object(\r
536                 ac, "accept",\r
537                 side_effect=socket.error(errno.ECONNABORTED, "")) as m:\r
538             ac.handle_accept()\r
539             assert m.called\r
540             self.assertIsNone(ac.socket)\r
541 \r
542     def test_handle_accept_typeerror(self):\r
543         # https://github.com/giampaolo/pyftpdlib/issues/91\r
544         ac = Acceptor()\r
545         with mock.patch.object(ac, "accept", side_effect=TypeError) as m:\r
546             ac.handle_accept()\r
547             assert m.called\r
548             self.assertIsNone(ac.socket)\r
549 \r
550 \r
551 if __name__ == '__main__':\r
552     unittest.main(verbosity=VERBOSITY)\r