X-FTP
[x93.git/.git] / xftp.git / ext / pyftpdlib / ioloop.py
1 # Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>.\r
2 # Use of this source code is governed by MIT license that can be\r
3 # found in the LICENSE file.\r
4 \r
5 """\r
6 A specialized IO loop on top of asyncore adding support for epoll()\r
7 on Linux and kqueue() and OSX/BSD, dramatically increasing performances\r
8 offered by base asyncore module.\r
9 \r
10 poll() and select() loops are also reimplemented and are an order of\r
11 magnitude faster as they support fd un/registration and modification.\r
12 \r
13 This module is not supposed to be used directly unless you want to\r
14 include a new dispatcher which runs within the main FTP server loop,\r
15 in which case:\r
16   __________________________________________________________________\r
17  |                      |                                           |\r
18  | INSTEAD OF           | ...USE:                                   |\r
19  |______________________|___________________________________________|\r
20  |                      |                                           |\r
21  | asyncore.dispacher   | Acceptor (for servers)                    |\r
22  | asyncore.dispacher   | Connector (for clients)                   |\r
23  | asynchat.async_chat  | AsyncChat (for a full duplex connection ) |\r
24  | asyncore.loop        | FTPServer.server_forever()                |\r
25  |______________________|___________________________________________|\r
26 \r
27 asyncore.dispatcher_with_send is not supported, same for "map" argument\r
28 for asyncore.loop and asyncore.dispatcher and asynchat.async_chat\r
29 constructors.\r
30 \r
31 Follows a server example:\r
32 \r
33 import socket\r
34 from pyftpdlib.ioloop import IOLoop, Acceptor, AsyncChat\r
35 \r
36 class Handler(AsyncChat):\r
37 \r
38     def __init__(self, sock):\r
39         AsyncChat.__init__(self, sock)\r
40         self.push('200 hello\r\n')\r
41         self.close_when_done()\r
42 \r
43 class Server(Acceptor):\r
44 \r
45     def __init__(self, host, port):\r
46         Acceptor.__init__(self)\r
47         self.create_socket(socket.AF_INET, socket.SOCK_STREAM)\r
48         self.set_reuse_addr()\r
49         self.bind((host, port))\r
50         self.listen(5)\r
51 \r
52     def handle_accepted(self, sock, addr):\r
53         Handler(sock)\r
54 \r
55 server = Server('localhost', 8021)\r
56 IOLoop.instance().loop()\r
57 """\r
58 \r
59 import asynchat\r
60 import asyncore\r
61 import errno\r
62 import heapq\r
63 import os\r
64 import select\r
65 import socket\r
66 import sys\r
67 import time\r
68 import traceback\r
69 try:\r
70     import threading\r
71 except ImportError:\r
72     import dummy_threading as threading\r
73 \r
74 from ._compat import callable\r
75 from .log import config_logging\r
76 from .log import debug\r
77 from .log import is_logging_configured\r
78 from .log import logger\r
79 \r
80 \r
81 timer = getattr(time, 'monotonic', time.time)\r
82 _read = asyncore.read\r
83 _write = asyncore.write\r
84 \r
85 # These errnos indicate that a connection has been abruptly terminated.\r
86 _ERRNOS_DISCONNECTED = set((\r
87     errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN, errno.ECONNABORTED,\r
88     errno.EPIPE, errno.EBADF, errno.ETIMEDOUT))\r
89 if hasattr(errno, "WSAECONNRESET"):\r
90     _ERRNOS_DISCONNECTED.add(errno.WSAECONNRESET)\r
91 if hasattr(errno, "WSAECONNABORTED"):\r
92     _ERRNOS_DISCONNECTED.add(errno.WSAECONNABORTED)\r
93 \r
94 # These errnos indicate that a non-blocking operation must be retried\r
95 # at a later time.\r
96 _ERRNOS_RETRY = set((errno.EAGAIN, errno.EWOULDBLOCK))\r
97 if hasattr(errno, "WSAEWOULDBLOCK"):\r
98     _ERRNOS_RETRY.add(errno.WSAEWOULDBLOCK)\r
99 \r
100 \r
101 class RetryError(Exception):\r
102     pass\r
103 \r
104 \r
105 # ===================================================================\r
106 # --- scheduler\r
107 # ===================================================================\r
108 \r
109 class _Scheduler(object):\r
110     """Run the scheduled functions due to expire soonest (if any)."""\r
111 \r
112     def __init__(self):\r
113         # the heap used for the scheduled tasks\r
114         self._tasks = []\r
115         self._cancellations = 0\r
116 \r
117     def poll(self):\r
118         """Run the scheduled functions due to expire soonest and\r
119         return the timeout of the next one (if any, else None).\r
120         """\r
121         now = timer()\r
122         calls = []\r
123         while self._tasks:\r
124             if now < self._tasks[0].timeout:\r
125                 break\r
126             call = heapq.heappop(self._tasks)\r
127             if call.cancelled:\r
128                 self._cancellations -= 1\r
129             else:\r
130                 calls.append(call)\r
131 \r
132         for call in calls:\r
133             if call._repush:\r
134                 heapq.heappush(self._tasks, call)\r
135                 call._repush = False\r
136                 continue\r
137             try:\r
138                 call.call()\r
139             except Exception:\r
140                 logger.error(traceback.format_exc())\r
141 \r
142         # remove cancelled tasks and re-heapify the queue if the\r
143         # number of cancelled tasks is more than the half of the\r
144         # entire queue\r
145         if self._cancellations > 512 and \\r
146                 self._cancellations > (len(self._tasks) >> 1):\r
147             debug("re-heapifying %s cancelled tasks" % self._cancellations)\r
148             self.reheapify()\r
149 \r
150         try:\r
151             return max(0, self._tasks[0].timeout - now)\r
152         except IndexError:\r
153             pass\r
154 \r
155     def register(self, what):\r
156         """Register a _CallLater instance."""\r
157         heapq.heappush(self._tasks, what)\r
158 \r
159     def unregister(self, what):\r
160         """Unregister a _CallLater instance.\r
161         The actual unregistration will happen at a later time though.\r
162         """\r
163         self._cancellations += 1\r
164 \r
165     def reheapify(self):\r
166         """Get rid of cancelled calls and reinitialize the internal heap."""\r
167         self._cancellations = 0\r
168         self._tasks = [x for x in self._tasks if not x.cancelled]\r
169         heapq.heapify(self._tasks)\r
170 \r
171 \r
172 class _CallLater(object):\r
173     """Container object which instance is returned by ioloop.call_later()."""\r
174 \r
175     __slots__ = ('_delay', '_target', '_args', '_kwargs', '_errback', '_sched',\r
176                  '_repush', 'timeout', 'cancelled')\r
177 \r
178     def __init__(self, seconds, target, *args, **kwargs):\r
179         assert callable(target), "%s is not callable" % target\r
180         assert sys.maxsize >= seconds >= 0, \\r
181             "%s is not greater than or equal to 0 seconds" % seconds\r
182         self._delay = seconds\r
183         self._target = target\r
184         self._args = args\r
185         self._kwargs = kwargs\r
186         self._errback = kwargs.pop('_errback', None)\r
187         self._sched = kwargs.pop('_scheduler')\r
188         self._repush = False\r
189         # seconds from the epoch at which to call the function\r
190         if not seconds:\r
191             self.timeout = 0\r
192         else:\r
193             self.timeout = timer() + self._delay\r
194         self.cancelled = False\r
195         self._sched.register(self)\r
196 \r
197     def __lt__(self, other):\r
198         return self.timeout < other.timeout\r
199 \r
200     def __le__(self, other):\r
201         return self.timeout <= other.timeout\r
202 \r
203     def __repr__(self):\r
204         if self._target is None:\r
205             sig = object.__repr__(self)\r
206         else:\r
207             sig = repr(self._target)\r
208         sig += ' args=%s, kwargs=%s, cancelled=%s, secs=%s' % (\r
209             self._args or '[]', self._kwargs or '{}', self.cancelled,\r
210             self._delay)\r
211         return '<%s>' % sig\r
212 \r
213     __str__ = __repr__\r
214 \r
215     def _post_call(self, exc):\r
216         if not self.cancelled:\r
217             self.cancel()\r
218 \r
219     def call(self):\r
220         """Call this scheduled function."""\r
221         assert not self.cancelled, "already cancelled"\r
222         exc = None\r
223         try:\r
224             self._target(*self._args, **self._kwargs)\r
225         except Exception as _:\r
226             exc = _\r
227             if self._errback is not None:\r
228                 self._errback()\r
229             else:\r
230                 raise\r
231         finally:\r
232             self._post_call(exc)\r
233 \r
234     def reset(self):\r
235         """Reschedule this call resetting the current countdown."""\r
236         assert not self.cancelled, "already cancelled"\r
237         self.timeout = timer() + self._delay\r
238         self._repush = True\r
239 \r
240     def cancel(self):\r
241         """Unschedule this call."""\r
242         if not self.cancelled:\r
243             self.cancelled = True\r
244             self._target = self._args = self._kwargs = self._errback = None\r
245             self._sched.unregister(self)\r
246 \r
247 \r
248 class _CallEvery(_CallLater):\r
249     """Container object which instance is returned by IOLoop.call_every()."""\r
250 \r
251     def _post_call(self, exc):\r
252         if not self.cancelled:\r
253             if exc:\r
254                 self.cancel()\r
255             else:\r
256                 self.timeout = timer() + self._delay\r
257                 self._sched.register(self)\r
258 \r
259 \r
260 class _IOLoop(object):\r
261     """Base class which will later be referred as IOLoop."""\r
262 \r
263     READ = 1\r
264     WRITE = 2\r
265     _instance = None\r
266     _lock = threading.Lock()\r
267     _started_once = False\r
268 \r
269     def __init__(self):\r
270         self.socket_map = {}\r
271         self.sched = _Scheduler()\r
272 \r
273     def __enter__(self):\r
274         return self\r
275 \r
276     def __exit__(self, *args):\r
277         self.close()\r
278 \r
279     def __repr__(self):\r
280         status = [self.__class__.__module__ + "." + self.__class__.__name__]\r
281         status.append("(fds=%s, tasks=%s)" % (\r
282             len(self.socket_map), len(self.sched._tasks)))\r
283         return '<%s at %#x>' % (' '.join(status), id(self))\r
284 \r
285     __str__ = __repr__\r
286 \r
287     @classmethod\r
288     def instance(cls):\r
289         """Return a global IOLoop instance."""\r
290         if cls._instance is None:\r
291             with cls._lock:\r
292                 if cls._instance is None:\r
293                     cls._instance = cls()\r
294         return cls._instance\r
295 \r
296     def register(self, fd, instance, events):\r
297         """Register a fd, handled by instance for the given events."""\r
298         raise NotImplementedError('must be implemented in subclass')\r
299 \r
300     def unregister(self, fd):\r
301         """Register fd."""\r
302         raise NotImplementedError('must be implemented in subclass')\r
303 \r
304     def modify(self, fd, events):\r
305         """Changes the events assigned for fd."""\r
306         raise NotImplementedError('must be implemented in subclass')\r
307 \r
308     def poll(self, timeout):\r
309         """Poll once.  The subclass overriding this method is supposed\r
310         to poll over the registered handlers and the scheduled functions\r
311         and then return.\r
312         """\r
313         raise NotImplementedError('must be implemented in subclass')\r
314 \r
315     def loop(self, timeout=None, blocking=True):\r
316         """Start the asynchronous IO loop.\r
317 \r
318          - (float) timeout: the timeout passed to the underlying\r
319            multiplex syscall (select(), epoll() etc.).\r
320 \r
321          - (bool) blocking: if True poll repeatedly, as long as there\r
322            are registered handlers and/or scheduled functions.\r
323            If False poll only once and return the timeout of the next\r
324            scheduled call (if any, else None).\r
325         """\r
326         if not _IOLoop._started_once:\r
327             _IOLoop._started_once = True\r
328             if not is_logging_configured():\r
329                 # If we get to this point it means the user hasn't\r
330                 # configured logging. We want to log by default so\r
331                 # we configure logging ourselves so that it will\r
332                 # print to stderr.\r
333                 config_logging()\r
334 \r
335         if blocking:\r
336             # localize variable access to minimize overhead\r
337             poll = self.poll\r
338             socket_map = self.socket_map\r
339             sched_poll = self.sched.poll\r
340 \r
341             if timeout is not None:\r
342                 while socket_map:\r
343                     poll(timeout)\r
344                     sched_poll()\r
345             else:\r
346                 soonest_timeout = None\r
347                 while socket_map:\r
348                     poll(soonest_timeout)\r
349                     soonest_timeout = sched_poll()\r
350         else:\r
351             sched = self.sched\r
352             if self.socket_map:\r
353                 self.poll(timeout)\r
354             if sched._tasks:\r
355                 return sched.poll()\r
356 \r
357     def call_later(self, seconds, target, *args, **kwargs):\r
358         """Calls a function at a later time.\r
359         It can be used to asynchronously schedule a call within the polling\r
360         loop without blocking it. The instance returned is an object that\r
361         can be used to cancel or reschedule the call.\r
362 \r
363          - (int) seconds: the number of seconds to wait\r
364          - (obj) target: the callable object to call later\r
365          - args: the arguments to call it with\r
366          - kwargs: the keyword arguments to call it with; a special\r
367            '_errback' parameter can be passed: it is a callable\r
368            called in case target function raises an exception.\r
369        """\r
370         kwargs['_scheduler'] = self.sched\r
371         return _CallLater(seconds, target, *args, **kwargs)\r
372 \r
373     def call_every(self, seconds, target, *args, **kwargs):\r
374         """Schedules the given callback to be called periodically."""\r
375         kwargs['_scheduler'] = self.sched\r
376         return _CallEvery(seconds, target, *args, **kwargs)\r
377 \r
378     def close(self):\r
379         """Closes the IOLoop, freeing any resources used."""\r
380         debug("closing IOLoop", self)\r
381         self.__class__._instance = None\r
382 \r
383         # free connections\r
384         instances = sorted(self.socket_map.values(), key=lambda x: x._fileno)\r
385         for inst in instances:\r
386             try:\r
387                 inst.close()\r
388             except OSError as err:\r
389                 if err.errno != errno.EBADF:\r
390                     logger.error(traceback.format_exc())\r
391             except Exception:\r
392                 logger.error(traceback.format_exc())\r
393         self.socket_map.clear()\r
394 \r
395         # free scheduled functions\r
396         for x in self.sched._tasks:\r
397             try:\r
398                 if not x.cancelled:\r
399                     x.cancel()\r
400             except Exception:\r
401                 logger.error(traceback.format_exc())\r
402         del self.sched._tasks[:]\r
403 \r
404 \r
405 # ===================================================================\r
406 # --- select() - POSIX / Windows\r
407 # ===================================================================\r
408 \r
409 class Select(_IOLoop):\r
410     """select()-based poller."""\r
411 \r
412     def __init__(self):\r
413         _IOLoop.__init__(self)\r
414         self._r = []\r
415         self._w = []\r
416 \r
417     def register(self, fd, instance, events):\r
418         if fd not in self.socket_map:\r
419             self.socket_map[fd] = instance\r
420             if events & self.READ:\r
421                 self._r.append(fd)\r
422             if events & self.WRITE:\r
423                 self._w.append(fd)\r
424 \r
425     def unregister(self, fd):\r
426         try:\r
427             del self.socket_map[fd]\r
428         except KeyError:\r
429             debug("call: unregister(); fd was no longer in socket_map", self)\r
430         for l in (self._r, self._w):\r
431             try:\r
432                 l.remove(fd)\r
433             except ValueError:\r
434                 pass\r
435 \r
436     def modify(self, fd, events):\r
437         inst = self.socket_map.get(fd)\r
438         if inst is not None:\r
439             self.unregister(fd)\r
440             self.register(fd, inst, events)\r
441         else:\r
442             debug("call: modify(); fd was no longer in socket_map", self)\r
443 \r
444     def poll(self, timeout):\r
445         try:\r
446             r, w, e = select.select(self._r, self._w, [], timeout)\r
447         except select.error as err:\r
448             if getattr(err, "errno", None) == errno.EINTR:\r
449                 return\r
450             raise\r
451 \r
452         smap_get = self.socket_map.get\r
453         for fd in r:\r
454             obj = smap_get(fd)\r
455             if obj is None or not obj.readable():\r
456                 continue\r
457             _read(obj)\r
458         for fd in w:\r
459             obj = smap_get(fd)\r
460             if obj is None or not obj.writable():\r
461                 continue\r
462             _write(obj)\r
463 \r
464 \r
465 # ===================================================================\r
466 # --- poll() / epoll()\r
467 # ===================================================================\r
468 \r
469 class _BasePollEpoll(_IOLoop):\r
470     """This is common to both poll() (UNIX), epoll() (Linux) and\r
471     /dev/poll (Solaris) implementations which share almost the same\r
472     interface.\r
473     Not supposed to be used directly.\r
474     """\r
475 \r
476     def __init__(self):\r
477         _IOLoop.__init__(self)\r
478         self._poller = self._poller()\r
479 \r
480     def register(self, fd, instance, events):\r
481         try:\r
482             self._poller.register(fd, events)\r
483         except EnvironmentError as err:\r
484             if err.errno == errno.EEXIST:\r
485                 debug("call: register(); poller raised EEXIST; ignored", self)\r
486             else:\r
487                 raise\r
488         self.socket_map[fd] = instance\r
489 \r
490     def unregister(self, fd):\r
491         try:\r
492             del self.socket_map[fd]\r
493         except KeyError:\r
494             debug("call: unregister(); fd was no longer in socket_map", self)\r
495         else:\r
496             try:\r
497                 self._poller.unregister(fd)\r
498             except EnvironmentError as err:\r
499                 if err.errno in (errno.ENOENT, errno.EBADF):\r
500                     debug("call: unregister(); poller returned %r; "\r
501                           "ignoring it" % err, self)\r
502                 else:\r
503                     raise\r
504 \r
505     def modify(self, fd, events):\r
506         try:\r
507             self._poller.modify(fd, events)\r
508         except OSError as err:\r
509             if err.errno == errno.ENOENT and fd in self.socket_map:\r
510                 # XXX - see:\r
511                 # https://github.com/giampaolo/pyftpdlib/issues/329\r
512                 instance = self.socket_map[fd]\r
513                 self.register(fd, instance, events)\r
514             else:\r
515                 raise\r
516 \r
517     def poll(self, timeout):\r
518         try:\r
519             events = self._poller.poll(timeout or -1)  # -1 waits indefinitely\r
520         except (IOError, select.error) as err:\r
521             # for epoll() and poll() respectively\r
522             if err.errno == errno.EINTR:\r
523                 return\r
524             raise\r
525         # localize variable access to minimize overhead\r
526         smap_get = self.socket_map.get\r
527         for fd, event in events:\r
528             inst = smap_get(fd)\r
529             if inst is None:\r
530                 continue\r
531             if event & self._ERROR and not event & self.READ:\r
532                 inst.handle_close()\r
533             else:\r
534                 if event & self.READ:\r
535                     if inst.readable():\r
536                         _read(inst)\r
537                 if event & self.WRITE:\r
538                     if inst.writable():\r
539                         _write(inst)\r
540 \r
541 \r
542 # ===================================================================\r
543 # --- poll() - POSIX\r
544 # ===================================================================\r
545 \r
546 if hasattr(select, 'poll'):\r
547 \r
548     class Poll(_BasePollEpoll):\r
549         """poll() based poller."""\r
550 \r
551         READ = select.POLLIN\r
552         WRITE = select.POLLOUT\r
553         _ERROR = select.POLLERR | select.POLLHUP | select.POLLNVAL\r
554         _poller = select.poll\r
555 \r
556         def modify(self, fd, events):\r
557             inst = self.socket_map[fd]\r
558             self.unregister(fd)\r
559             self.register(fd, inst, events)\r
560 \r
561         def poll(self, timeout):\r
562             # poll() timeout is expressed in milliseconds\r
563             if timeout is not None:\r
564                 timeout = int(timeout * 1000)\r
565             _BasePollEpoll.poll(self, timeout)\r
566 \r
567 \r
568 # ===================================================================\r
569 # --- /dev/poll - Solaris (introduced in python 3.3)\r
570 # ===================================================================\r
571 \r
572 if hasattr(select, 'devpoll'):  # pragma: no cover\r
573 \r
574     class DevPoll(_BasePollEpoll):\r
575         """/dev/poll based poller (introduced in python 3.3)."""\r
576 \r
577         READ = select.POLLIN\r
578         WRITE = select.POLLOUT\r
579         _ERROR = select.POLLERR | select.POLLHUP | select.POLLNVAL\r
580         _poller = select.devpoll\r
581 \r
582         # introduced in python 3.4\r
583         if hasattr(select.devpoll, 'fileno'):\r
584             def fileno(self):\r
585                 """Return devpoll() fd."""\r
586                 return self._poller.fileno()\r
587 \r
588         def modify(self, fd, events):\r
589             inst = self.socket_map[fd]\r
590             self.unregister(fd)\r
591             self.register(fd, inst, events)\r
592 \r
593         def poll(self, timeout):\r
594             # /dev/poll timeout is expressed in milliseconds\r
595             if timeout is not None:\r
596                 timeout = int(timeout * 1000)\r
597             _BasePollEpoll.poll(self, timeout)\r
598 \r
599         # introduced in python 3.4\r
600         if hasattr(select.devpoll, 'close'):\r
601             def close(self):\r
602                 _IOLoop.close(self)\r
603                 self._poller.close()\r
604 \r
605 \r
606 # ===================================================================\r
607 # --- epoll() - Linux\r
608 # ===================================================================\r
609 \r
610 if hasattr(select, 'epoll'):\r
611 \r
612     class Epoll(_BasePollEpoll):\r
613         """epoll() based poller."""\r
614 \r
615         READ = select.EPOLLIN\r
616         WRITE = select.EPOLLOUT\r
617         _ERROR = select.EPOLLERR | select.EPOLLHUP\r
618         _poller = select.epoll\r
619 \r
620         def fileno(self):\r
621             """Return epoll() fd."""\r
622             return self._poller.fileno()\r
623 \r
624         def close(self):\r
625             _IOLoop.close(self)\r
626             self._poller.close()\r
627 \r
628 \r
629 # ===================================================================\r
630 # --- kqueue() - BSD / OSX\r
631 # ===================================================================\r
632 \r
633 if hasattr(select, 'kqueue'):  # pragma: no cover\r
634 \r
635     class Kqueue(_IOLoop):\r
636         """kqueue() based poller."""\r
637 \r
638         def __init__(self):\r
639             _IOLoop.__init__(self)\r
640             self._kqueue = select.kqueue()\r
641             self._active = {}\r
642 \r
643         def fileno(self):\r
644             """Return kqueue() fd."""\r
645             return self._kqueue.fileno()\r
646 \r
647         def close(self):\r
648             _IOLoop.close(self)\r
649             self._kqueue.close()\r
650 \r
651         def register(self, fd, instance, events):\r
652             self.socket_map[fd] = instance\r
653             try:\r
654                 self._control(fd, events, select.KQ_EV_ADD)\r
655             except EnvironmentError as err:\r
656                 if err.errno == errno.EEXIST:\r
657                     debug("call: register(); poller raised EEXIST; ignored",\r
658                           self)\r
659                 else:\r
660                     raise\r
661             self._active[fd] = events\r
662 \r
663         def unregister(self, fd):\r
664             try:\r
665                 del self.socket_map[fd]\r
666                 events = self._active.pop(fd)\r
667             except KeyError:\r
668                 pass\r
669             else:\r
670                 try:\r
671                     self._control(fd, events, select.KQ_EV_DELETE)\r
672                 except EnvironmentError as err:\r
673                     if err.errno in (errno.ENOENT, errno.EBADF):\r
674                         debug("call: unregister(); poller returned %r; "\r
675                               "ignoring it" % err, self)\r
676                     else:\r
677                         raise\r
678 \r
679         def modify(self, fd, events):\r
680             instance = self.socket_map[fd]\r
681             self.unregister(fd)\r
682             self.register(fd, instance, events)\r
683 \r
684         def _control(self, fd, events, flags):\r
685             kevents = []\r
686             if events & self.WRITE:\r
687                 kevents.append(select.kevent(\r
688                     fd, filter=select.KQ_FILTER_WRITE, flags=flags))\r
689             if events & self.READ or not kevents:\r
690                 # always read when there is not a write\r
691                 kevents.append(select.kevent(\r
692                     fd, filter=select.KQ_FILTER_READ, flags=flags))\r
693             # even though control() takes a list, it seems to return\r
694             # EINVAL on Mac OS X (10.6) when there is more than one\r
695             # event in the list\r
696             for kevent in kevents:\r
697                 self._kqueue.control([kevent], 0)\r
698 \r
699         # localize variable access to minimize overhead\r
700         def poll(self,\r
701                  timeout,\r
702                  _len=len,\r
703                  _READ=select.KQ_FILTER_READ,\r
704                  _WRITE=select.KQ_FILTER_WRITE,\r
705                  _EOF=select.KQ_EV_EOF,\r
706                  _ERROR=select.KQ_EV_ERROR):\r
707             try:\r
708                 kevents = self._kqueue.control(None, _len(self.socket_map),\r
709                                                timeout)\r
710             except OSError as err:\r
711                 if err.errno == errno.EINTR:\r
712                     return\r
713                 raise\r
714             for kevent in kevents:\r
715                 inst = self.socket_map.get(kevent.ident)\r
716                 if inst is None:\r
717                     continue\r
718                 if kevent.filter == _READ:\r
719                     if inst.readable():\r
720                         _read(inst)\r
721                 if kevent.filter == _WRITE:\r
722                     if kevent.flags & _EOF:\r
723                         # If an asynchronous connection is refused,\r
724                         # kqueue returns a write event with the EOF\r
725                         # flag set.\r
726                         # Note that for read events, EOF may be returned\r
727                         # before all data has been consumed from the\r
728                         # socket buffer, so we only check for EOF on\r
729                         # write events.\r
730                         inst.handle_close()\r
731                     else:\r
732                         if inst.writable():\r
733                             _write(inst)\r
734                 if kevent.flags & _ERROR:\r
735                     inst.handle_close()\r
736 \r
737 \r
738 # ===================================================================\r
739 # --- choose the better poller for this platform\r
740 # ===================================================================\r
741 \r
742 if hasattr(select, 'epoll'):      # epoll() - Linux\r
743     IOLoop = Epoll\r
744 elif hasattr(select, 'kqueue'):   # kqueue() - BSD / OSX\r
745     IOLoop = Kqueue\r
746 elif hasattr(select, 'devpoll'):  # /dev/poll - Solaris\r
747     IOLoop = DevPoll\r
748 elif hasattr(select, 'poll'):     # poll() - POSIX\r
749     IOLoop = Poll\r
750 else:                             # select() - POSIX and Windows\r
751     IOLoop = Select\r
752 \r
753 \r
754 # ===================================================================\r
755 # --- asyncore dispatchers\r
756 # ===================================================================\r
757 \r
758 # these are overridden in order to register() and unregister()\r
759 # file descriptors against the new pollers\r
760 \r
761 \r
762 class AsyncChat(asynchat.async_chat):\r
763     """Same as asynchat.async_chat, only working with the new IO poller\r
764     and being more clever in avoid registering for read events when\r
765     it shouldn't.\r
766     """\r
767 \r
768     def __init__(self, sock=None, ioloop=None):\r
769         self.ioloop = ioloop or IOLoop.instance()\r
770         self._wanted_io_events = self.ioloop.READ\r
771         self._current_io_events = self.ioloop.READ\r
772         self._closed = False\r
773         self._closing = False\r
774         self._fileno = sock.fileno() if sock else None\r
775         self._tasks = []\r
776         asynchat.async_chat.__init__(self, sock)\r
777 \r
778     # --- IO loop related methods\r
779 \r
780     def add_channel(self, map=None, events=None):\r
781         assert self._fileno, repr(self._fileno)\r
782         events = events if events is not None else self.ioloop.READ\r
783         self.ioloop.register(self._fileno, self, events)\r
784         self._wanted_io_events = events\r
785         self._current_io_events = events\r
786 \r
787     def del_channel(self, map=None):\r
788         if self._fileno is not None:\r
789             self.ioloop.unregister(self._fileno)\r
790 \r
791     def modify_ioloop_events(self, events, logdebug=False):\r
792         if not self._closed:\r
793             assert self._fileno, repr(self._fileno)\r
794             if self._fileno not in self.ioloop.socket_map:\r
795                 debug(\r
796                     "call: modify_ioloop_events(), fd was no longer in "\r
797                     "socket_map, had to register() it again", inst=self)\r
798                 self.add_channel(events=events)\r
799             else:\r
800                 if events != self._current_io_events:\r
801                     if logdebug:\r
802                         if events == self.ioloop.READ:\r
803                             ev = "R"\r
804                         elif events == self.ioloop.WRITE:\r
805                             ev = "W"\r
806                         elif events == self.ioloop.READ | self.ioloop.WRITE:\r
807                             ev = "RW"\r
808                         else:\r
809                             ev = events\r
810                         debug("call: IOLoop.modify(); setting %r IO events" % (\r
811                             ev), self)\r
812                     self.ioloop.modify(self._fileno, events)\r
813             self._current_io_events = events\r
814         else:\r
815             debug(\r
816                 "call: modify_ioloop_events(), handler had already been "\r
817                 "close()d, skipping modify()", inst=self)\r
818 \r
819     # --- utils\r
820 \r
821     def call_later(self, seconds, target, *args, **kwargs):\r
822         """Same as self.ioloop.call_later but also cancel()s the\r
823         scheduled function on close().\r
824         """\r
825         if '_errback' not in kwargs and hasattr(self, 'handle_error'):\r
826             kwargs['_errback'] = self.handle_error\r
827         callback = self.ioloop.call_later(seconds, target, *args, **kwargs)\r
828         self._tasks.append(callback)\r
829         return callback\r
830 \r
831     # --- overridden asynchat methods\r
832 \r
833     def connect(self, addr):\r
834         self.modify_ioloop_events(self.ioloop.WRITE)\r
835         asynchat.async_chat.connect(self, addr)\r
836 \r
837     def connect_af_unspecified(self, addr, source_address=None):\r
838         """Same as connect() but guesses address family from addr.\r
839         Return the address family just determined.\r
840         """\r
841         assert self.socket is None\r
842         host, port = addr\r
843         err = "getaddrinfo() returned an empty list"\r
844         info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,\r
845                                   socket.SOCK_STREAM, 0, socket.AI_PASSIVE)\r
846         for res in info:\r
847             self.socket = None\r
848             af, socktype, proto, canonname, sa = res\r
849             try:\r
850                 self.create_socket(af, socktype)\r
851                 if source_address:\r
852                     if source_address[0].startswith('::ffff:'):\r
853                         # In this scenario, the server has an IPv6 socket, but\r
854                         # the remote client is using IPv4 and its address is\r
855                         # represented as an IPv4-mapped IPv6 address which\r
856                         # looks like this ::ffff:151.12.5.65, see:\r
857                         # http://en.wikipedia.org/wiki/IPv6\\r
858                         #     IPv4-mapped_addresses\r
859                         # http://tools.ietf.org/html/rfc3493.html#section-3.7\r
860                         # We truncate the first bytes to make it look like a\r
861                         # common IPv4 address.\r
862                         source_address = (source_address[0][7:],\r
863                                           source_address[1])\r
864                     self.bind(source_address)\r
865                 self.connect((host, port))\r
866             except socket.error as _:\r
867                 err = _\r
868                 if self.socket is not None:\r
869                     self.socket.close()\r
870                     self.del_channel()\r
871                     self.socket = None\r
872                 continue\r
873             break\r
874         if self.socket is None:\r
875             self.del_channel()\r
876             raise socket.error(err)\r
877         return af\r
878 \r
879     # send() and recv() overridden as a fix around various bugs:\r
880     # - http://bugs.python.org/issue1736101\r
881     # - https://github.com/giampaolo/pyftpdlib/issues/104\r
882     # - https://github.com/giampaolo/pyftpdlib/issues/109\r
883 \r
884     def send(self, data):\r
885         try:\r
886             return self.socket.send(data)\r
887         except socket.error as err:\r
888             debug("call: send(), err: %s" % err, inst=self)\r
889             if err.errno in _ERRNOS_RETRY:\r
890                 return 0\r
891             elif err.errno in _ERRNOS_DISCONNECTED:\r
892                 self.handle_close()\r
893                 return 0\r
894             else:\r
895                 raise\r
896 \r
897     def recv(self, buffer_size):\r
898         try:\r
899             data = self.socket.recv(buffer_size)\r
900         except socket.error as err:\r
901             debug("call: recv(), err: %s" % err, inst=self)\r
902             if err.errno in _ERRNOS_DISCONNECTED:\r
903                 self.handle_close()\r
904                 return b''\r
905             elif err.errno in _ERRNOS_RETRY:\r
906                 raise RetryError\r
907             else:\r
908                 raise\r
909         else:\r
910             if not data:\r
911                 # a closed connection is indicated by signaling\r
912                 # a read condition, and having recv() return 0.\r
913                 self.handle_close()\r
914                 return b''\r
915             else:\r
916                 return data\r
917 \r
918     def handle_read(self):\r
919         try:\r
920             asynchat.async_chat.handle_read(self)\r
921         except RetryError:\r
922             # This can be raised by (the overridden) recv().\r
923             pass\r
924 \r
925     def initiate_send(self):\r
926         asynchat.async_chat.initiate_send(self)\r
927         if not self._closed:\r
928             # if there's still data to send we want to be ready\r
929             # for writing, else we're only intereseted in reading\r
930             if not self.producer_fifo:\r
931                 wanted = self.ioloop.READ\r
932             else:\r
933                 # In FTPHandler, we also want to listen for user input\r
934                 # hence the READ. DTPHandler has its own initiate_send()\r
935                 # which will either READ or WRITE.\r
936                 wanted = self.ioloop.READ | self.ioloop.WRITE\r
937             if self._wanted_io_events != wanted:\r
938                 self.ioloop.modify(self._fileno, wanted)\r
939                 self._wanted_io_events = wanted\r
940         else:\r
941             debug("call: initiate_send(); called with no connection",\r
942                   inst=self)\r
943 \r
944     def close_when_done(self):\r
945         if len(self.producer_fifo) == 0:\r
946             self.handle_close()\r
947         else:\r
948             self._closing = True\r
949             asynchat.async_chat.close_when_done(self)\r
950 \r
951     def close(self):\r
952         if not self._closed:\r
953             self._closed = True\r
954             try:\r
955                 asynchat.async_chat.close(self)\r
956             finally:\r
957                 for fun in self._tasks:\r
958                     try:\r
959                         fun.cancel()\r
960                     except Exception:\r
961                         logger.error(traceback.format_exc())\r
962                 self._tasks = []\r
963                 self._closed = True\r
964                 self._closing = False\r
965                 self.connected = False\r
966 \r
967 \r
968 class Connector(AsyncChat):\r
969     """Same as base AsyncChat and supposed to be used for\r
970     clients.\r
971     """\r
972 \r
973     def add_channel(self, map=None, events=None):\r
974         AsyncChat.add_channel(self, map=map, events=self.ioloop.WRITE)\r
975 \r
976 \r
977 class Acceptor(AsyncChat):\r
978     """Same as base AsyncChat and supposed to be used to\r
979     accept new connections.\r
980     """\r
981 \r
982     def add_channel(self, map=None, events=None):\r
983         AsyncChat.add_channel(self, map=map, events=self.ioloop.READ)\r
984 \r
985     def bind_af_unspecified(self, addr):\r
986         """Same as bind() but guesses address family from addr.\r
987         Return the address family just determined.\r
988         """\r
989         assert self.socket is None\r
990         host, port = addr\r
991         if host == "":\r
992             # When using bind() "" is a symbolic name meaning all\r
993             # available interfaces. People might not know we're\r
994             # using getaddrinfo() internally, which uses None\r
995             # instead of "", so we'll make the conversion for them.\r
996             host = None\r
997         err = "getaddrinfo() returned an empty list"\r
998         info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,\r
999                                   socket.SOCK_STREAM, 0, socket.AI_PASSIVE)\r
1000         for res in info:\r
1001             self.socket = None\r
1002             self.del_channel()\r
1003             af, socktype, proto, canonname, sa = res\r
1004             try:\r
1005                 self.create_socket(af, socktype)\r
1006                 self.set_reuse_addr()\r
1007                 self.bind(sa)\r
1008             except socket.error as _:\r
1009                 err = _\r
1010                 if self.socket is not None:\r
1011                     self.socket.close()\r
1012                     self.del_channel()\r
1013                     self.socket = None\r
1014                 continue\r
1015             break\r
1016         if self.socket is None:\r
1017             self.del_channel()\r
1018             raise socket.error(err)\r
1019         return af\r
1020 \r
1021     def listen(self, num):\r
1022         AsyncChat.listen(self, num)\r
1023         # XXX - this seems to be necessary, otherwise kqueue.control()\r
1024         # won't return listening fd events\r
1025         try:\r
1026             if isinstance(self.ioloop, Kqueue):\r
1027                 self.ioloop.modify(self._fileno, self.ioloop.READ)\r
1028         except NameError:\r
1029             pass\r
1030 \r
1031     def handle_accept(self):\r
1032         try:\r
1033             sock, addr = self.accept()\r
1034         except TypeError:\r
1035             # sometimes accept() might return None, see:\r
1036             # https://github.com/giampaolo/pyftpdlib/issues/91\r
1037             debug("call: handle_accept(); accept() returned None", self)\r
1038             return\r
1039         except socket.error as err:\r
1040             # ECONNABORTED might be thrown on *BSD, see:\r
1041             # https://github.com/giampaolo/pyftpdlib/issues/105\r
1042             if err.errno != errno.ECONNABORTED:\r
1043                 raise\r
1044             else:\r
1045                 debug("call: handle_accept(); accept() returned ECONNABORTED",\r
1046                       self)\r
1047         else:\r
1048             # sometimes addr == None instead of (ip, port) (see issue 104)\r
1049             if addr is not None:\r
1050                 self.handle_accepted(sock, addr)\r
1051 \r
1052     def handle_accepted(self, sock, addr):\r
1053         sock.close()\r
1054         self.log_info('unhandled accepted event', 'warning')\r
1055 \r
1056     # overridden for convenience; avoid to reuse address on Windows\r
1057     if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):\r
1058         def set_reuse_addr(self):\r
1059             pass\r