1 #!/usr/bin/env python
\r
3 # Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>.
\r
4 # Use of this source code is governed by MIT license that can be
\r
5 # found in the LICENSE file.
\r
14 from pyftpdlib._compat import PY3
\r
15 from pyftpdlib.ioloop import Acceptor
\r
16 from pyftpdlib.ioloop import AsyncChat
\r
17 from pyftpdlib.ioloop import IOLoop
\r
18 from pyftpdlib.ioloop import RetryError
\r
19 from pyftpdlib.test import mock
\r
20 from pyftpdlib.test import POSIX
\r
21 from pyftpdlib.test import TestCase
\r
22 from pyftpdlib.test import unittest
\r
23 from pyftpdlib.test import VERBOSITY
\r
24 import pyftpdlib.ioloop
\r
27 if hasattr(socket, 'socketpair'):
\r
28 socketpair = socket.socketpair
\r
30 def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
\r
31 with contextlib.closing(socket.socket(family, type, proto)) as l:
\r
32 l.bind(("localhost", 0))
\r
34 c = socket.socket(family, type, proto)
\r
36 c.connect(l.getsockname())
\r
37 caddr = c.getsockname()
\r
39 a, addr = l.accept()
\r
40 # check that we've got the correct client
\r
49 # TODO: write more tests.
\r
50 class BaseIOLoopTestCase(object):
\r
54 def make_socketpair(self):
\r
55 rd, wr = socketpair()
\r
56 self.addCleanup(rd.close)
\r
57 self.addCleanup(wr.close)
\r
60 def test_register(self):
\r
61 s = self.ioloop_class()
\r
62 self.addCleanup(s.close)
\r
63 rd, wr = self.make_socketpair()
\r
64 handler = AsyncChat(rd)
\r
65 s.register(rd, handler, s.READ)
\r
66 s.register(wr, handler, s.WRITE)
\r
67 self.assertIn(rd, s.socket_map)
\r
68 self.assertIn(wr, s.socket_map)
\r
71 def test_unregister(self):
\r
72 s, rd, wr = self.test_register()
\r
75 self.assertNotIn(rd, s.socket_map)
\r
76 self.assertNotIn(wr, s.socket_map)
\r
78 def test_unregister_twice(self):
\r
79 s, rd, wr = self.test_register()
\r
85 def test_modify(self):
\r
86 s, rd, wr = self.test_register()
\r
87 s.modify(rd, s.WRITE)
\r
88 s.modify(wr, s.READ)
\r
90 def test_loop(self):
\r
92 s, rd, wr = self.test_register()
\r
93 s.call_later(0, s.close)
\r
96 s, rd, wr = self.test_register()
\r
97 s.call_later(0, s.close)
\r
98 s.loop(timeout=0.001)
\r
100 def test_close(self):
\r
101 s, rd, wr = self.test_register()
\r
103 self.assertEqual(s.socket_map, {})
\r
105 def test_close_w_handler_exc(self):
\r
106 # Simulate an exception when close()ing a socket handler.
\r
107 # Exception should be logged and ignored.
\r
108 class Handler(AsyncChat):
\r
113 s = self.ioloop_class()
\r
114 self.addCleanup(s.close)
\r
115 rd, wr = self.make_socketpair()
\r
116 handler = Handler(rd)
\r
117 s.register(rd, handler, s.READ)
\r
118 with mock.patch("pyftpdlib.ioloop.logger.error") as m:
\r
121 self.assertIn('ZeroDivisionError', m.call_args[0][0])
\r
123 def test_close_w_handler_ebadf_exc(self):
\r
124 # Simulate an exception when close()ing a socket handler.
\r
125 # Exception should be ignored (and not logged).
\r
126 class Handler(AsyncChat):
\r
129 raise OSError(errno.EBADF, "")
\r
131 s = self.ioloop_class()
\r
132 self.addCleanup(s.close)
\r
133 rd, wr = self.make_socketpair()
\r
134 handler = Handler(rd)
\r
135 s.register(rd, handler, s.READ)
\r
136 with mock.patch("pyftpdlib.ioloop.logger.error") as m:
\r
138 assert not m.called
\r
140 def test_close_w_callback_exc(self):
\r
141 # Simulate an exception when close()ing the IO loop and a
\r
142 # scheduled callback raises an exception on cancel().
\r
143 with mock.patch("pyftpdlib.ioloop.logger.error") as logerr:
\r
144 with mock.patch("pyftpdlib.ioloop._CallLater.cancel",
\r
145 side_effect=lambda: 1 / 0) as cancel:
\r
146 s = self.ioloop_class()
\r
147 self.addCleanup(s.close)
\r
148 s.call_later(1, lambda: 0)
\r
150 assert cancel.called
\r
151 assert logerr.called
\r
152 self.assertIn('ZeroDivisionError', logerr.call_args[0][0])
\r
155 class DefaultIOLoopTestCase(TestCase, BaseIOLoopTestCase):
\r
156 ioloop_class = pyftpdlib.ioloop.IOLoop
\r
159 # ===================================================================
\r
161 # ===================================================================
\r
163 class SelectIOLoopTestCase(TestCase, BaseIOLoopTestCase):
\r
164 ioloop_class = pyftpdlib.ioloop.Select
\r
166 def test_select_eintr(self):
\r
167 # EINTR is supposed to be ignored
\r
168 with mock.patch('pyftpdlib.ioloop.select.select',
\r
169 side_effect=select.error()) as m:
\r
170 m.side_effect.errno = errno.EINTR
\r
171 s, rd, wr = self.test_register()
\r
174 with mock.patch('pyftpdlib.ioloop.select.select',
\r
175 side_effect=select.error()) as m:
\r
176 m.side_effect.errno = errno.EBADF
\r
177 s, rd, wr = self.test_register()
\r
178 self.assertRaises(select.error, s.poll, 0)
\r
181 # ===================================================================
\r
183 # ===================================================================
\r
185 @unittest.skipUnless(hasattr(pyftpdlib.ioloop, 'Poll'),
\r
186 "poll() not available on this platform")
\r
187 class PollIOLoopTestCase(TestCase, BaseIOLoopTestCase):
\r
188 ioloop_class = getattr(pyftpdlib.ioloop, "Poll", None)
\r
189 poller_mock = "pyftpdlib.ioloop.Poll._poller"
\r
191 @unittest.skipIf(sys.version_info[:2] == (3, 2), "")
\r
192 def test_eintr_on_poll(self):
\r
193 # EINTR is supposed to be ignored
\r
194 with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:
\r
196 m.return_value.poll.side_effect = select.error
\r
197 m.return_value.poll.side_effect.errno = errno.EINTR
\r
199 m.return_value.poll.side_effect = OSError(errno.EINTR, "")
\r
200 s, rd, wr = self.test_register()
\r
204 with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:
\r
206 m.return_value.poll.side_effect = select.error
\r
207 m.return_value.poll.side_effect.errno = errno.EBADF
\r
209 m.return_value.poll.side_effect = OSError(errno.EBADF, "")
\r
210 s, rd, wr = self.test_register()
\r
211 self.assertRaises(select.error, s.poll, 0)
\r
214 def test_eexist_on_register(self):
\r
215 # EEXIST is supposed to be ignored
\r
216 with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:
\r
217 m.return_value.register.side_effect = \
\r
218 EnvironmentError(errno.EEXIST, "")
\r
219 s, rd, wr = self.test_register()
\r
221 with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:
\r
222 m.return_value.register.side_effect = \
\r
223 EnvironmentError(errno.EBADF, "")
\r
224 self.assertRaises(EnvironmentError, self.test_register)
\r
226 def test_enoent_ebadf_on_unregister(self):
\r
227 # ENOENT and EBADF are supposed to be ignored
\r
228 for errnum in (errno.EBADF, errno.ENOENT):
\r
229 with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:
\r
230 m.return_value.unregister.side_effect = \
\r
231 EnvironmentError(errnum, "")
\r
232 s, rd, wr = self.test_register()
\r
234 # ...but just those
\r
235 with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:
\r
236 m.return_value.unregister.side_effect = \
\r
237 EnvironmentError(errno.EEXIST, "")
\r
238 s, rd, wr = self.test_register()
\r
239 self.assertRaises(EnvironmentError, s.unregister, rd)
\r
241 def test_enoent_on_modify(self):
\r
242 # ENOENT is supposed to be ignored
\r
243 with mock.patch(self.poller_mock, return_vaue=mock.Mock()) as m:
\r
244 m.return_value.modify.side_effect = \
\r
245 OSError(errno.ENOENT, "")
\r
246 s, rd, wr = self.test_register()
\r
247 s.modify(rd, s.READ)
\r
250 # ===================================================================
\r
252 # ===================================================================
\r
254 @unittest.skipUnless(hasattr(pyftpdlib.ioloop, 'Epoll'),
\r
255 "epoll() not available on this platform (Linux only)")
\r
256 class EpollIOLoopTestCase(PollIOLoopTestCase):
\r
257 ioloop_class = getattr(pyftpdlib.ioloop, "Epoll", None)
\r
258 poller_mock = "pyftpdlib.ioloop.Epoll._poller"
\r
261 # ===================================================================
\r
263 # ===================================================================
\r
265 @unittest.skipUnless(hasattr(pyftpdlib.ioloop, 'DevPoll'),
\r
266 "/dev/poll not available on this platform (Solaris only)")
\r
267 class DevPollIOLoopTestCase(TestCase, BaseIOLoopTestCase):
\r
268 ioloop_class = getattr(pyftpdlib.ioloop, "DevPoll", None)
\r
271 # ===================================================================
\r
273 # ===================================================================
\r
275 @unittest.skipUnless(hasattr(pyftpdlib.ioloop, 'Kqueue'),
\r
276 "/dev/poll not available on this platform (BSD only)")
\r
277 class KqueueIOLoopTestCase(TestCase, BaseIOLoopTestCase):
\r
278 ioloop_class = getattr(pyftpdlib.ioloop, "Kqueue", None)
\r
281 class TestCallLater(TestCase):
\r
282 """Tests for CallLater class."""
\r
285 self.ioloop = IOLoop.instance()
\r
286 for task in self.ioloop.sched._tasks:
\r
287 if not task.cancelled:
\r
289 del self.ioloop.sched._tasks[:]
\r
291 def scheduler(self, timeout=0.01, count=100):
\r
292 while self.ioloop.sched._tasks and count > 0:
\r
293 self.ioloop.sched.poll()
\r
295 time.sleep(timeout)
\r
297 def test_interface(self):
\r
301 self.assertRaises(AssertionError, self.ioloop.call_later, -1, fun)
\r
302 x = self.ioloop.call_later(3, fun)
\r
303 self.assertEqual(x.cancelled, False)
\r
305 self.assertEqual(x.cancelled, True)
\r
306 self.assertRaises(AssertionError, x.call)
\r
307 self.assertRaises(AssertionError, x.reset)
\r
310 def test_order(self):
\r
315 for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
\r
316 self.ioloop.call_later(x, fun, x)
\r
318 self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
\r
320 # The test is reliable only on those systems where time.time()
\r
321 # provides time with a better precision than 1 second.
\r
322 if not str(time.time()).endswith('.0'):
\r
323 def test_reset(self):
\r
328 self.ioloop.call_later(0.01, fun, 0.01)
\r
329 self.ioloop.call_later(0.02, fun, 0.02)
\r
330 self.ioloop.call_later(0.03, fun, 0.03)
\r
331 x = self.ioloop.call_later(0.04, fun, 0.04)
\r
332 self.ioloop.call_later(0.05, fun, 0.05)
\r
336 self.assertEqual(l, [0.01, 0.02, 0.03, 0.05, 0.04])
\r
338 def test_cancel(self):
\r
343 self.ioloop.call_later(0.01, fun, 0.01).cancel()
\r
344 self.ioloop.call_later(0.02, fun, 0.02)
\r
345 self.ioloop.call_later(0.03, fun, 0.03)
\r
346 self.ioloop.call_later(0.04, fun, 0.04)
\r
347 self.ioloop.call_later(0.05, fun, 0.05).cancel()
\r
349 self.assertEqual(l, [0.02, 0.03, 0.04])
\r
351 def test_errback(self):
\r
353 self.ioloop.call_later(
\r
354 0.0, lambda: 1 // 0, _errback=lambda: l.append(True))
\r
356 self.assertEqual(l, [True])
\r
358 def test__repr__(self):
\r
359 repr(self.ioloop.call_later(0.01, lambda: 0, 0.01))
\r
361 def test__lt__(self):
\r
362 a = self.ioloop.call_later(0.01, lambda: 0, 0.01)
\r
363 b = self.ioloop.call_later(0.02, lambda: 0, 0.02)
\r
364 self.assertTrue(a < b)
\r
366 def test__le__(self):
\r
367 a = self.ioloop.call_later(0.01, lambda: 0, 0.01)
\r
368 b = self.ioloop.call_later(0.02, lambda: 0, 0.02)
\r
369 self.assertTrue(a <= b)
\r
372 class TestCallEvery(TestCase):
\r
373 """Tests for CallEvery class."""
\r
376 self.ioloop = IOLoop.instance()
\r
377 for task in self.ioloop.sched._tasks:
\r
378 if not task.cancelled:
\r
380 del self.ioloop.sched._tasks[:]
\r
382 def scheduler(self, timeout=0.003):
\r
383 stop_at = time.time() + timeout
\r
384 while time.time() < stop_at:
\r
385 self.ioloop.sched.poll()
\r
387 def test_interface(self):
\r
391 self.assertRaises(AssertionError, self.ioloop.call_every, -1, fun)
\r
392 x = self.ioloop.call_every(3, fun)
\r
393 self.assertEqual(x.cancelled, False)
\r
395 self.assertEqual(x.cancelled, True)
\r
396 self.assertRaises(AssertionError, x.call)
\r
397 self.assertRaises(AssertionError, x.reset)
\r
400 def test_only_once(self):
\r
401 # make sure that callback is called only once per-loop
\r
406 self.ioloop.call_every(0, fun)
\r
407 self.ioloop.sched.poll()
\r
408 self.assertEqual(l1, [None])
\r
410 def test_multi_0_timeout(self):
\r
411 # make sure a 0 timeout callback is called as many times
\r
412 # as the number of loops
\r
417 self.ioloop.call_every(0, fun)
\r
418 for x in range(100):
\r
419 self.ioloop.sched.poll()
\r
420 self.assertEqual(len(l), 100)
\r
422 # run it on systems where time.time() has a higher precision
\r
424 def test_low_and_high_timeouts(self):
\r
425 # make sure a callback with a lower timeout is called more
\r
426 # frequently than another with a greater timeout
\r
431 self.ioloop.call_every(0.001, fun)
\r
438 self.ioloop.call_every(0.005, fun)
\r
439 self.scheduler(timeout=0.01)
\r
441 self.assertTrue(len(l1) > len(l2))
\r
443 def test_cancel(self):
\r
444 # make sure a cancelled callback doesn't get called anymore
\r
449 call = self.ioloop.call_every(0.001, fun)
\r
454 self.assertEqual(len_l, len(l))
\r
456 def test_errback(self):
\r
458 self.ioloop.call_every(
\r
459 0.0, lambda: 1 // 0, _errback=lambda: l.append(True))
\r
464 class TestAsyncChat(TestCase):
\r
466 def get_connected_handler(self):
\r
467 s = socket.socket()
\r
468 self.addCleanup(s.close)
\r
469 ac = AsyncChat(sock=s)
\r
470 self.addCleanup(ac.close)
\r
473 def test_send_retry(self):
\r
474 ac = self.get_connected_handler()
\r
475 for errnum in pyftpdlib.ioloop._ERRNOS_RETRY:
\r
476 with mock.patch("pyftpdlib.ioloop.socket.socket.send",
\r
477 side_effect=socket.error(errnum, "")) as m:
\r
478 self.assertEqual(ac.send(b"x"), 0)
\r
481 def test_send_disconnect(self):
\r
482 ac = self.get_connected_handler()
\r
483 for errnum in pyftpdlib.ioloop._ERRNOS_DISCONNECTED:
\r
484 with mock.patch("pyftpdlib.ioloop.socket.socket.send",
\r
485 side_effect=socket.error(errnum, "")) as send:
\r
486 with mock.patch.object(ac, "handle_close") as handle_close:
\r
487 self.assertEqual(ac.send(b"x"), 0)
\r
489 assert handle_close.called
\r
491 def test_recv_retry(self):
\r
492 ac = self.get_connected_handler()
\r
493 for errnum in pyftpdlib.ioloop._ERRNOS_RETRY:
\r
494 with mock.patch("pyftpdlib.ioloop.socket.socket.recv",
\r
495 side_effect=socket.error(errnum, "")) as m:
\r
496 self.assertRaises(RetryError, ac.recv, 1024)
\r
499 def test_recv_disconnect(self):
\r
500 ac = self.get_connected_handler()
\r
501 for errnum in pyftpdlib.ioloop._ERRNOS_DISCONNECTED:
\r
502 with mock.patch("pyftpdlib.ioloop.socket.socket.recv",
\r
503 side_effect=socket.error(errnum, "")) as send:
\r
504 with mock.patch.object(ac, "handle_close") as handle_close:
\r
505 self.assertEqual(ac.recv(b"x"), b'')
\r
507 assert handle_close.called
\r
509 def test_connect_af_unspecified_err(self):
\r
511 with mock.patch.object(
\r
513 side_effect=socket.error(errno.EBADF, "")) as m:
\r
514 self.assertRaises(socket.error,
\r
515 ac.connect_af_unspecified, ("localhost", 0))
\r
517 self.assertIsNone(ac.socket)
\r
520 class TestAcceptor(TestCase):
\r
522 def test_bind_af_unspecified_err(self):
\r
524 with mock.patch.object(
\r
526 side_effect=socket.error(errno.EBADF, "")) as m:
\r
527 self.assertRaises(socket.error,
\r
528 ac.bind_af_unspecified, ("localhost", 0))
\r
530 self.assertIsNone(ac.socket)
\r
532 def test_handle_accept_econnacorted(self):
\r
533 # https://github.com/giampaolo/pyftpdlib/issues/105
\r
535 with mock.patch.object(
\r
537 side_effect=socket.error(errno.ECONNABORTED, "")) as m:
\r
540 self.assertIsNone(ac.socket)
\r
542 def test_handle_accept_typeerror(self):
\r
543 # https://github.com/giampaolo/pyftpdlib/issues/91
\r
545 with mock.patch.object(ac, "accept", side_effect=TypeError) as m:
\r
548 self.assertIsNone(ac.socket)
\r
551 if __name__ == '__main__':
\r
552 unittest.main(verbosity=VERBOSITY)
\r