1 # Copyright (C) 2007 Giampaolo Rodola' <g.rodola@gmail.com>.
\r
2 # Use of this source code is governed by MIT license that can be
\r
3 # found in the LICENSE file.
\r
6 A specialized IO loop on top of asyncore adding support for epoll()
\r
7 on Linux and kqueue() and OSX/BSD, dramatically increasing performances
\r
8 offered by base asyncore module.
\r
10 poll() and select() loops are also reimplemented and are an order of
\r
11 magnitude faster as they support fd un/registration and modification.
\r
13 This module is not supposed to be used directly unless you want to
\r
14 include a new dispatcher which runs within the main FTP server loop,
\r
16 __________________________________________________________________
\r
18 | INSTEAD OF | ...USE: |
\r
19 |______________________|___________________________________________|
\r
21 | asyncore.dispacher | Acceptor (for servers) |
\r
22 | asyncore.dispacher | Connector (for clients) |
\r
23 | asynchat.async_chat | AsyncChat (for a full duplex connection ) |
\r
24 | asyncore.loop | FTPServer.server_forever() |
\r
25 |______________________|___________________________________________|
\r
27 asyncore.dispatcher_with_send is not supported, same for "map" argument
\r
28 for asyncore.loop and asyncore.dispatcher and asynchat.async_chat
\r
31 Follows a server example:
\r
34 from pyftpdlib.ioloop import IOLoop, Acceptor, AsyncChat
\r
36 class Handler(AsyncChat):
\r
38 def __init__(self, sock):
\r
39 AsyncChat.__init__(self, sock)
\r
40 self.push('200 hello\r\n')
\r
41 self.close_when_done()
\r
43 class Server(Acceptor):
\r
45 def __init__(self, host, port):
\r
46 Acceptor.__init__(self)
\r
47 self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
\r
48 self.set_reuse_addr()
\r
49 self.bind((host, port))
\r
52 def handle_accepted(self, sock, addr):
\r
55 server = Server('localhost', 8021)
\r
56 IOLoop.instance().loop()
\r
72 import dummy_threading as threading
\r
74 from ._compat import callable
\r
75 from .log import config_logging
\r
76 from .log import debug
\r
77 from .log import is_logging_configured
\r
78 from .log import logger
\r
81 timer = getattr(time, 'monotonic', time.time)
\r
82 _read = asyncore.read
\r
83 _write = asyncore.write
\r
85 # These errnos indicate that a connection has been abruptly terminated.
\r
86 _ERRNOS_DISCONNECTED = set((
\r
87 errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN, errno.ECONNABORTED,
\r
88 errno.EPIPE, errno.EBADF, errno.ETIMEDOUT))
\r
89 if hasattr(errno, "WSAECONNRESET"):
\r
90 _ERRNOS_DISCONNECTED.add(errno.WSAECONNRESET)
\r
91 if hasattr(errno, "WSAECONNABORTED"):
\r
92 _ERRNOS_DISCONNECTED.add(errno.WSAECONNABORTED)
\r
94 # These errnos indicate that a non-blocking operation must be retried
\r
96 _ERRNOS_RETRY = set((errno.EAGAIN, errno.EWOULDBLOCK))
\r
97 if hasattr(errno, "WSAEWOULDBLOCK"):
\r
98 _ERRNOS_RETRY.add(errno.WSAEWOULDBLOCK)
\r
101 class RetryError(Exception):
\r
105 # ===================================================================
\r
107 # ===================================================================
\r
109 class _Scheduler(object):
\r
110 """Run the scheduled functions due to expire soonest (if any)."""
\r
112 def __init__(self):
\r
113 # the heap used for the scheduled tasks
\r
115 self._cancellations = 0
\r
118 """Run the scheduled functions due to expire soonest and
\r
119 return the timeout of the next one (if any, else None).
\r
124 if now < self._tasks[0].timeout:
\r
126 call = heapq.heappop(self._tasks)
\r
128 self._cancellations -= 1
\r
134 heapq.heappush(self._tasks, call)
\r
135 call._repush = False
\r
140 logger.error(traceback.format_exc())
\r
142 # remove cancelled tasks and re-heapify the queue if the
\r
143 # number of cancelled tasks is more than the half of the
\r
145 if self._cancellations > 512 and \
\r
146 self._cancellations > (len(self._tasks) >> 1):
\r
147 debug("re-heapifying %s cancelled tasks" % self._cancellations)
\r
151 return max(0, self._tasks[0].timeout - now)
\r
155 def register(self, what):
\r
156 """Register a _CallLater instance."""
\r
157 heapq.heappush(self._tasks, what)
\r
159 def unregister(self, what):
\r
160 """Unregister a _CallLater instance.
\r
161 The actual unregistration will happen at a later time though.
\r
163 self._cancellations += 1
\r
165 def reheapify(self):
\r
166 """Get rid of cancelled calls and reinitialize the internal heap."""
\r
167 self._cancellations = 0
\r
168 self._tasks = [x for x in self._tasks if not x.cancelled]
\r
169 heapq.heapify(self._tasks)
\r
172 class _CallLater(object):
\r
173 """Container object which instance is returned by ioloop.call_later()."""
\r
175 __slots__ = ('_delay', '_target', '_args', '_kwargs', '_errback', '_sched',
\r
176 '_repush', 'timeout', 'cancelled')
\r
178 def __init__(self, seconds, target, *args, **kwargs):
\r
179 assert callable(target), "%s is not callable" % target
\r
180 assert sys.maxsize >= seconds >= 0, \
\r
181 "%s is not greater than or equal to 0 seconds" % seconds
\r
182 self._delay = seconds
\r
183 self._target = target
\r
185 self._kwargs = kwargs
\r
186 self._errback = kwargs.pop('_errback', None)
\r
187 self._sched = kwargs.pop('_scheduler')
\r
188 self._repush = False
\r
189 # seconds from the epoch at which to call the function
\r
193 self.timeout = timer() + self._delay
\r
194 self.cancelled = False
\r
195 self._sched.register(self)
\r
197 def __lt__(self, other):
\r
198 return self.timeout < other.timeout
\r
200 def __le__(self, other):
\r
201 return self.timeout <= other.timeout
\r
203 def __repr__(self):
\r
204 if self._target is None:
\r
205 sig = object.__repr__(self)
\r
207 sig = repr(self._target)
\r
208 sig += ' args=%s, kwargs=%s, cancelled=%s, secs=%s' % (
\r
209 self._args or '[]', self._kwargs or '{}', self.cancelled,
\r
211 return '<%s>' % sig
\r
215 def _post_call(self, exc):
\r
216 if not self.cancelled:
\r
220 """Call this scheduled function."""
\r
221 assert not self.cancelled, "already cancelled"
\r
224 self._target(*self._args, **self._kwargs)
\r
225 except Exception as _:
\r
227 if self._errback is not None:
\r
232 self._post_call(exc)
\r
235 """Reschedule this call resetting the current countdown."""
\r
236 assert not self.cancelled, "already cancelled"
\r
237 self.timeout = timer() + self._delay
\r
238 self._repush = True
\r
241 """Unschedule this call."""
\r
242 if not self.cancelled:
\r
243 self.cancelled = True
\r
244 self._target = self._args = self._kwargs = self._errback = None
\r
245 self._sched.unregister(self)
\r
248 class _CallEvery(_CallLater):
\r
249 """Container object which instance is returned by IOLoop.call_every()."""
\r
251 def _post_call(self, exc):
\r
252 if not self.cancelled:
\r
256 self.timeout = timer() + self._delay
\r
257 self._sched.register(self)
\r
260 class _IOLoop(object):
\r
261 """Base class which will later be referred as IOLoop."""
\r
266 _lock = threading.Lock()
\r
267 _started_once = False
\r
269 def __init__(self):
\r
270 self.socket_map = {}
\r
271 self.sched = _Scheduler()
\r
273 def __enter__(self):
\r
276 def __exit__(self, *args):
\r
279 def __repr__(self):
\r
280 status = [self.__class__.__module__ + "." + self.__class__.__name__]
\r
281 status.append("(fds=%s, tasks=%s)" % (
\r
282 len(self.socket_map), len(self.sched._tasks)))
\r
283 return '<%s at %#x>' % (' '.join(status), id(self))
\r
289 """Return a global IOLoop instance."""
\r
290 if cls._instance is None:
\r
292 if cls._instance is None:
\r
293 cls._instance = cls()
\r
294 return cls._instance
\r
296 def register(self, fd, instance, events):
\r
297 """Register a fd, handled by instance for the given events."""
\r
298 raise NotImplementedError('must be implemented in subclass')
\r
300 def unregister(self, fd):
\r
302 raise NotImplementedError('must be implemented in subclass')
\r
304 def modify(self, fd, events):
\r
305 """Changes the events assigned for fd."""
\r
306 raise NotImplementedError('must be implemented in subclass')
\r
308 def poll(self, timeout):
\r
309 """Poll once. The subclass overriding this method is supposed
\r
310 to poll over the registered handlers and the scheduled functions
\r
313 raise NotImplementedError('must be implemented in subclass')
\r
315 def loop(self, timeout=None, blocking=True):
\r
316 """Start the asynchronous IO loop.
\r
318 - (float) timeout: the timeout passed to the underlying
\r
319 multiplex syscall (select(), epoll() etc.).
\r
321 - (bool) blocking: if True poll repeatedly, as long as there
\r
322 are registered handlers and/or scheduled functions.
\r
323 If False poll only once and return the timeout of the next
\r
324 scheduled call (if any, else None).
\r
326 if not _IOLoop._started_once:
\r
327 _IOLoop._started_once = True
\r
328 if not is_logging_configured():
\r
329 # If we get to this point it means the user hasn't
\r
330 # configured logging. We want to log by default so
\r
331 # we configure logging ourselves so that it will
\r
336 # localize variable access to minimize overhead
\r
338 socket_map = self.socket_map
\r
339 sched_poll = self.sched.poll
\r
341 if timeout is not None:
\r
346 soonest_timeout = None
\r
348 poll(soonest_timeout)
\r
349 soonest_timeout = sched_poll()
\r
352 if self.socket_map:
\r
355 return sched.poll()
\r
357 def call_later(self, seconds, target, *args, **kwargs):
\r
358 """Calls a function at a later time.
\r
359 It can be used to asynchronously schedule a call within the polling
\r
360 loop without blocking it. The instance returned is an object that
\r
361 can be used to cancel or reschedule the call.
\r
363 - (int) seconds: the number of seconds to wait
\r
364 - (obj) target: the callable object to call later
\r
365 - args: the arguments to call it with
\r
366 - kwargs: the keyword arguments to call it with; a special
\r
367 '_errback' parameter can be passed: it is a callable
\r
368 called in case target function raises an exception.
\r
370 kwargs['_scheduler'] = self.sched
\r
371 return _CallLater(seconds, target, *args, **kwargs)
\r
373 def call_every(self, seconds, target, *args, **kwargs):
\r
374 """Schedules the given callback to be called periodically."""
\r
375 kwargs['_scheduler'] = self.sched
\r
376 return _CallEvery(seconds, target, *args, **kwargs)
\r
379 """Closes the IOLoop, freeing any resources used."""
\r
380 debug("closing IOLoop", self)
\r
381 self.__class__._instance = None
\r
384 instances = sorted(self.socket_map.values(), key=lambda x: x._fileno)
\r
385 for inst in instances:
\r
388 except OSError as err:
\r
389 if err.errno != errno.EBADF:
\r
390 logger.error(traceback.format_exc())
\r
392 logger.error(traceback.format_exc())
\r
393 self.socket_map.clear()
\r
395 # free scheduled functions
\r
396 for x in self.sched._tasks:
\r
398 if not x.cancelled:
\r
401 logger.error(traceback.format_exc())
\r
402 del self.sched._tasks[:]
\r
405 # ===================================================================
\r
406 # --- select() - POSIX / Windows
\r
407 # ===================================================================
\r
409 class Select(_IOLoop):
\r
410 """select()-based poller."""
\r
412 def __init__(self):
\r
413 _IOLoop.__init__(self)
\r
417 def register(self, fd, instance, events):
\r
418 if fd not in self.socket_map:
\r
419 self.socket_map[fd] = instance
\r
420 if events & self.READ:
\r
422 if events & self.WRITE:
\r
425 def unregister(self, fd):
\r
427 del self.socket_map[fd]
\r
429 debug("call: unregister(); fd was no longer in socket_map", self)
\r
430 for l in (self._r, self._w):
\r
436 def modify(self, fd, events):
\r
437 inst = self.socket_map.get(fd)
\r
438 if inst is not None:
\r
439 self.unregister(fd)
\r
440 self.register(fd, inst, events)
\r
442 debug("call: modify(); fd was no longer in socket_map", self)
\r
444 def poll(self, timeout):
\r
446 r, w, e = select.select(self._r, self._w, [], timeout)
\r
447 except select.error as err:
\r
448 if getattr(err, "errno", None) == errno.EINTR:
\r
452 smap_get = self.socket_map.get
\r
455 if obj is None or not obj.readable():
\r
460 if obj is None or not obj.writable():
\r
465 # ===================================================================
\r
466 # --- poll() / epoll()
\r
467 # ===================================================================
\r
469 class _BasePollEpoll(_IOLoop):
\r
470 """This is common to both poll() (UNIX), epoll() (Linux) and
\r
471 /dev/poll (Solaris) implementations which share almost the same
\r
473 Not supposed to be used directly.
\r
476 def __init__(self):
\r
477 _IOLoop.__init__(self)
\r
478 self._poller = self._poller()
\r
480 def register(self, fd, instance, events):
\r
482 self._poller.register(fd, events)
\r
483 except EnvironmentError as err:
\r
484 if err.errno == errno.EEXIST:
\r
485 debug("call: register(); poller raised EEXIST; ignored", self)
\r
488 self.socket_map[fd] = instance
\r
490 def unregister(self, fd):
\r
492 del self.socket_map[fd]
\r
494 debug("call: unregister(); fd was no longer in socket_map", self)
\r
497 self._poller.unregister(fd)
\r
498 except EnvironmentError as err:
\r
499 if err.errno in (errno.ENOENT, errno.EBADF):
\r
500 debug("call: unregister(); poller returned %r; "
\r
501 "ignoring it" % err, self)
\r
505 def modify(self, fd, events):
\r
507 self._poller.modify(fd, events)
\r
508 except OSError as err:
\r
509 if err.errno == errno.ENOENT and fd in self.socket_map:
\r
511 # https://github.com/giampaolo/pyftpdlib/issues/329
\r
512 instance = self.socket_map[fd]
\r
513 self.register(fd, instance, events)
\r
517 def poll(self, timeout):
\r
519 events = self._poller.poll(timeout or -1) # -1 waits indefinitely
\r
520 except (IOError, select.error) as err:
\r
521 # for epoll() and poll() respectively
\r
522 if err.errno == errno.EINTR:
\r
525 # localize variable access to minimize overhead
\r
526 smap_get = self.socket_map.get
\r
527 for fd, event in events:
\r
528 inst = smap_get(fd)
\r
531 if event & self._ERROR and not event & self.READ:
\r
532 inst.handle_close()
\r
534 if event & self.READ:
\r
535 if inst.readable():
\r
537 if event & self.WRITE:
\r
538 if inst.writable():
\r
542 # ===================================================================
\r
543 # --- poll() - POSIX
\r
544 # ===================================================================
\r
546 if hasattr(select, 'poll'):
\r
548 class Poll(_BasePollEpoll):
\r
549 """poll() based poller."""
\r
551 READ = select.POLLIN
\r
552 WRITE = select.POLLOUT
\r
553 _ERROR = select.POLLERR | select.POLLHUP | select.POLLNVAL
\r
554 _poller = select.poll
\r
556 def modify(self, fd, events):
\r
557 inst = self.socket_map[fd]
\r
558 self.unregister(fd)
\r
559 self.register(fd, inst, events)
\r
561 def poll(self, timeout):
\r
562 # poll() timeout is expressed in milliseconds
\r
563 if timeout is not None:
\r
564 timeout = int(timeout * 1000)
\r
565 _BasePollEpoll.poll(self, timeout)
\r
568 # ===================================================================
\r
569 # --- /dev/poll - Solaris (introduced in python 3.3)
\r
570 # ===================================================================
\r
572 if hasattr(select, 'devpoll'): # pragma: no cover
\r
574 class DevPoll(_BasePollEpoll):
\r
575 """/dev/poll based poller (introduced in python 3.3)."""
\r
577 READ = select.POLLIN
\r
578 WRITE = select.POLLOUT
\r
579 _ERROR = select.POLLERR | select.POLLHUP | select.POLLNVAL
\r
580 _poller = select.devpoll
\r
582 # introduced in python 3.4
\r
583 if hasattr(select.devpoll, 'fileno'):
\r
585 """Return devpoll() fd."""
\r
586 return self._poller.fileno()
\r
588 def modify(self, fd, events):
\r
589 inst = self.socket_map[fd]
\r
590 self.unregister(fd)
\r
591 self.register(fd, inst, events)
\r
593 def poll(self, timeout):
\r
594 # /dev/poll timeout is expressed in milliseconds
\r
595 if timeout is not None:
\r
596 timeout = int(timeout * 1000)
\r
597 _BasePollEpoll.poll(self, timeout)
\r
599 # introduced in python 3.4
\r
600 if hasattr(select.devpoll, 'close'):
\r
602 _IOLoop.close(self)
\r
603 self._poller.close()
\r
606 # ===================================================================
\r
607 # --- epoll() - Linux
\r
608 # ===================================================================
\r
610 if hasattr(select, 'epoll'):
\r
612 class Epoll(_BasePollEpoll):
\r
613 """epoll() based poller."""
\r
615 READ = select.EPOLLIN
\r
616 WRITE = select.EPOLLOUT
\r
617 _ERROR = select.EPOLLERR | select.EPOLLHUP
\r
618 _poller = select.epoll
\r
621 """Return epoll() fd."""
\r
622 return self._poller.fileno()
\r
625 _IOLoop.close(self)
\r
626 self._poller.close()
\r
629 # ===================================================================
\r
630 # --- kqueue() - BSD / OSX
\r
631 # ===================================================================
\r
633 if hasattr(select, 'kqueue'): # pragma: no cover
\r
635 class Kqueue(_IOLoop):
\r
636 """kqueue() based poller."""
\r
638 def __init__(self):
\r
639 _IOLoop.__init__(self)
\r
640 self._kqueue = select.kqueue()
\r
644 """Return kqueue() fd."""
\r
645 return self._kqueue.fileno()
\r
648 _IOLoop.close(self)
\r
649 self._kqueue.close()
\r
651 def register(self, fd, instance, events):
\r
652 self.socket_map[fd] = instance
\r
654 self._control(fd, events, select.KQ_EV_ADD)
\r
655 except EnvironmentError as err:
\r
656 if err.errno == errno.EEXIST:
\r
657 debug("call: register(); poller raised EEXIST; ignored",
\r
661 self._active[fd] = events
\r
663 def unregister(self, fd):
\r
665 del self.socket_map[fd]
\r
666 events = self._active.pop(fd)
\r
671 self._control(fd, events, select.KQ_EV_DELETE)
\r
672 except EnvironmentError as err:
\r
673 if err.errno in (errno.ENOENT, errno.EBADF):
\r
674 debug("call: unregister(); poller returned %r; "
\r
675 "ignoring it" % err, self)
\r
679 def modify(self, fd, events):
\r
680 instance = self.socket_map[fd]
\r
681 self.unregister(fd)
\r
682 self.register(fd, instance, events)
\r
684 def _control(self, fd, events, flags):
\r
686 if events & self.WRITE:
\r
687 kevents.append(select.kevent(
\r
688 fd, filter=select.KQ_FILTER_WRITE, flags=flags))
\r
689 if events & self.READ or not kevents:
\r
690 # always read when there is not a write
\r
691 kevents.append(select.kevent(
\r
692 fd, filter=select.KQ_FILTER_READ, flags=flags))
\r
693 # even though control() takes a list, it seems to return
\r
694 # EINVAL on Mac OS X (10.6) when there is more than one
\r
695 # event in the list
\r
696 for kevent in kevents:
\r
697 self._kqueue.control([kevent], 0)
\r
699 # localize variable access to minimize overhead
\r
703 _READ=select.KQ_FILTER_READ,
\r
704 _WRITE=select.KQ_FILTER_WRITE,
\r
705 _EOF=select.KQ_EV_EOF,
\r
706 _ERROR=select.KQ_EV_ERROR):
\r
708 kevents = self._kqueue.control(None, _len(self.socket_map),
\r
710 except OSError as err:
\r
711 if err.errno == errno.EINTR:
\r
714 for kevent in kevents:
\r
715 inst = self.socket_map.get(kevent.ident)
\r
718 if kevent.filter == _READ:
\r
719 if inst.readable():
\r
721 if kevent.filter == _WRITE:
\r
722 if kevent.flags & _EOF:
\r
723 # If an asynchronous connection is refused,
\r
724 # kqueue returns a write event with the EOF
\r
726 # Note that for read events, EOF may be returned
\r
727 # before all data has been consumed from the
\r
728 # socket buffer, so we only check for EOF on
\r
730 inst.handle_close()
\r
732 if inst.writable():
\r
734 if kevent.flags & _ERROR:
\r
735 inst.handle_close()
\r
738 # ===================================================================
\r
739 # --- choose the better poller for this platform
\r
740 # ===================================================================
\r
742 if hasattr(select, 'epoll'): # epoll() - Linux
\r
744 elif hasattr(select, 'kqueue'): # kqueue() - BSD / OSX
\r
746 elif hasattr(select, 'devpoll'): # /dev/poll - Solaris
\r
748 elif hasattr(select, 'poll'): # poll() - POSIX
\r
750 else: # select() - POSIX and Windows
\r
754 # ===================================================================
\r
755 # --- asyncore dispatchers
\r
756 # ===================================================================
\r
758 # these are overridden in order to register() and unregister()
\r
759 # file descriptors against the new pollers
\r
762 class AsyncChat(asynchat.async_chat):
\r
763 """Same as asynchat.async_chat, only working with the new IO poller
\r
764 and being more clever in avoid registering for read events when
\r
768 def __init__(self, sock=None, ioloop=None):
\r
769 self.ioloop = ioloop or IOLoop.instance()
\r
770 self._wanted_io_events = self.ioloop.READ
\r
771 self._current_io_events = self.ioloop.READ
\r
772 self._closed = False
\r
773 self._closing = False
\r
774 self._fileno = sock.fileno() if sock else None
\r
776 asynchat.async_chat.__init__(self, sock)
\r
778 # --- IO loop related methods
\r
780 def add_channel(self, map=None, events=None):
\r
781 assert self._fileno, repr(self._fileno)
\r
782 events = events if events is not None else self.ioloop.READ
\r
783 self.ioloop.register(self._fileno, self, events)
\r
784 self._wanted_io_events = events
\r
785 self._current_io_events = events
\r
787 def del_channel(self, map=None):
\r
788 if self._fileno is not None:
\r
789 self.ioloop.unregister(self._fileno)
\r
791 def modify_ioloop_events(self, events, logdebug=False):
\r
792 if not self._closed:
\r
793 assert self._fileno, repr(self._fileno)
\r
794 if self._fileno not in self.ioloop.socket_map:
\r
796 "call: modify_ioloop_events(), fd was no longer in "
\r
797 "socket_map, had to register() it again", inst=self)
\r
798 self.add_channel(events=events)
\r
800 if events != self._current_io_events:
\r
802 if events == self.ioloop.READ:
\r
804 elif events == self.ioloop.WRITE:
\r
806 elif events == self.ioloop.READ | self.ioloop.WRITE:
\r
810 debug("call: IOLoop.modify(); setting %r IO events" % (
\r
812 self.ioloop.modify(self._fileno, events)
\r
813 self._current_io_events = events
\r
816 "call: modify_ioloop_events(), handler had already been "
\r
817 "close()d, skipping modify()", inst=self)
\r
821 def call_later(self, seconds, target, *args, **kwargs):
\r
822 """Same as self.ioloop.call_later but also cancel()s the
\r
823 scheduled function on close().
\r
825 if '_errback' not in kwargs and hasattr(self, 'handle_error'):
\r
826 kwargs['_errback'] = self.handle_error
\r
827 callback = self.ioloop.call_later(seconds, target, *args, **kwargs)
\r
828 self._tasks.append(callback)
\r
831 # --- overridden asynchat methods
\r
833 def connect(self, addr):
\r
834 self.modify_ioloop_events(self.ioloop.WRITE)
\r
835 asynchat.async_chat.connect(self, addr)
\r
837 def connect_af_unspecified(self, addr, source_address=None):
\r
838 """Same as connect() but guesses address family from addr.
\r
839 Return the address family just determined.
\r
841 assert self.socket is None
\r
843 err = "getaddrinfo() returned an empty list"
\r
844 info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
\r
845 socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
\r
848 af, socktype, proto, canonname, sa = res
\r
850 self.create_socket(af, socktype)
\r
852 if source_address[0].startswith('::ffff:'):
\r
853 # In this scenario, the server has an IPv6 socket, but
\r
854 # the remote client is using IPv4 and its address is
\r
855 # represented as an IPv4-mapped IPv6 address which
\r
856 # looks like this ::ffff:151.12.5.65, see:
\r
857 # http://en.wikipedia.org/wiki/IPv6\
\r
858 # IPv4-mapped_addresses
\r
859 # http://tools.ietf.org/html/rfc3493.html#section-3.7
\r
860 # We truncate the first bytes to make it look like a
\r
861 # common IPv4 address.
\r
862 source_address = (source_address[0][7:],
\r
864 self.bind(source_address)
\r
865 self.connect((host, port))
\r
866 except socket.error as _:
\r
868 if self.socket is not None:
\r
869 self.socket.close()
\r
874 if self.socket is None:
\r
876 raise socket.error(err)
\r
879 # send() and recv() overridden as a fix around various bugs:
\r
880 # - http://bugs.python.org/issue1736101
\r
881 # - https://github.com/giampaolo/pyftpdlib/issues/104
\r
882 # - https://github.com/giampaolo/pyftpdlib/issues/109
\r
884 def send(self, data):
\r
886 return self.socket.send(data)
\r
887 except socket.error as err:
\r
888 debug("call: send(), err: %s" % err, inst=self)
\r
889 if err.errno in _ERRNOS_RETRY:
\r
891 elif err.errno in _ERRNOS_DISCONNECTED:
\r
892 self.handle_close()
\r
897 def recv(self, buffer_size):
\r
899 data = self.socket.recv(buffer_size)
\r
900 except socket.error as err:
\r
901 debug("call: recv(), err: %s" % err, inst=self)
\r
902 if err.errno in _ERRNOS_DISCONNECTED:
\r
903 self.handle_close()
\r
905 elif err.errno in _ERRNOS_RETRY:
\r
911 # a closed connection is indicated by signaling
\r
912 # a read condition, and having recv() return 0.
\r
913 self.handle_close()
\r
918 def handle_read(self):
\r
920 asynchat.async_chat.handle_read(self)
\r
922 # This can be raised by (the overridden) recv().
\r
925 def initiate_send(self):
\r
926 asynchat.async_chat.initiate_send(self)
\r
927 if not self._closed:
\r
928 # if there's still data to send we want to be ready
\r
929 # for writing, else we're only intereseted in reading
\r
930 if not self.producer_fifo:
\r
931 wanted = self.ioloop.READ
\r
933 # In FTPHandler, we also want to listen for user input
\r
934 # hence the READ. DTPHandler has its own initiate_send()
\r
935 # which will either READ or WRITE.
\r
936 wanted = self.ioloop.READ | self.ioloop.WRITE
\r
937 if self._wanted_io_events != wanted:
\r
938 self.ioloop.modify(self._fileno, wanted)
\r
939 self._wanted_io_events = wanted
\r
941 debug("call: initiate_send(); called with no connection",
\r
944 def close_when_done(self):
\r
945 if len(self.producer_fifo) == 0:
\r
946 self.handle_close()
\r
948 self._closing = True
\r
949 asynchat.async_chat.close_when_done(self)
\r
952 if not self._closed:
\r
953 self._closed = True
\r
955 asynchat.async_chat.close(self)
\r
957 for fun in self._tasks:
\r
961 logger.error(traceback.format_exc())
\r
963 self._closed = True
\r
964 self._closing = False
\r
965 self.connected = False
\r
968 class Connector(AsyncChat):
\r
969 """Same as base AsyncChat and supposed to be used for
\r
973 def add_channel(self, map=None, events=None):
\r
974 AsyncChat.add_channel(self, map=map, events=self.ioloop.WRITE)
\r
977 class Acceptor(AsyncChat):
\r
978 """Same as base AsyncChat and supposed to be used to
\r
979 accept new connections.
\r
982 def add_channel(self, map=None, events=None):
\r
983 AsyncChat.add_channel(self, map=map, events=self.ioloop.READ)
\r
985 def bind_af_unspecified(self, addr):
\r
986 """Same as bind() but guesses address family from addr.
\r
987 Return the address family just determined.
\r
989 assert self.socket is None
\r
992 # When using bind() "" is a symbolic name meaning all
\r
993 # available interfaces. People might not know we're
\r
994 # using getaddrinfo() internally, which uses None
\r
995 # instead of "", so we'll make the conversion for them.
\r
997 err = "getaddrinfo() returned an empty list"
\r
998 info = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
\r
999 socket.SOCK_STREAM, 0, socket.AI_PASSIVE)
\r
1001 self.socket = None
\r
1002 self.del_channel()
\r
1003 af, socktype, proto, canonname, sa = res
\r
1005 self.create_socket(af, socktype)
\r
1006 self.set_reuse_addr()
\r
1008 except socket.error as _:
\r
1010 if self.socket is not None:
\r
1011 self.socket.close()
\r
1012 self.del_channel()
\r
1013 self.socket = None
\r
1016 if self.socket is None:
\r
1017 self.del_channel()
\r
1018 raise socket.error(err)
\r
1021 def listen(self, num):
\r
1022 AsyncChat.listen(self, num)
\r
1023 # XXX - this seems to be necessary, otherwise kqueue.control()
\r
1024 # won't return listening fd events
\r
1026 if isinstance(self.ioloop, Kqueue):
\r
1027 self.ioloop.modify(self._fileno, self.ioloop.READ)
\r
1031 def handle_accept(self):
\r
1033 sock, addr = self.accept()
\r
1035 # sometimes accept() might return None, see:
\r
1036 # https://github.com/giampaolo/pyftpdlib/issues/91
\r
1037 debug("call: handle_accept(); accept() returned None", self)
\r
1039 except socket.error as err:
\r
1040 # ECONNABORTED might be thrown on *BSD, see:
\r
1041 # https://github.com/giampaolo/pyftpdlib/issues/105
\r
1042 if err.errno != errno.ECONNABORTED:
\r
1045 debug("call: handle_accept(); accept() returned ECONNABORTED",
\r
1048 # sometimes addr == None instead of (ip, port) (see issue 104)
\r
1049 if addr is not None:
\r
1050 self.handle_accepted(sock, addr)
\r
1052 def handle_accepted(self, sock, addr):
\r
1054 self.log_info('unhandled accepted event', 'warning')
\r
1056 # overridden for convenience; avoid to reuse address on Windows
\r
1057 if (os.name in ('nt', 'ce')) or (sys.platform == 'cygwin'):
\r
1058 def set_reuse_addr(self):
\r