Blob Blame History Raw
diff --git a/scripts/urlgrabber-ext-down b/scripts/urlgrabber-ext-down
new file mode 100755
index 0000000..c37e6a8
--- /dev/null
+++ b/scripts/urlgrabber-ext-down
@@ -0,0 +1,55 @@
+#! /usr/bin/python
+#  A very simple external downloader
+
+import time, os, errno, sys
+from urlgrabber.grabber import \
+    _readlines, URLGrabberOptions, _loads, \
+    PyCurlFileObject, URLGrabError
+
+def write(fmt, *arg):
+    try: os.write(1, fmt % arg)
+    except OSError, e:
+        if e.arg[0] != errno.EPIPE: raise
+        sys.exit(1)
+
+class ProxyProgress:
+    def start(self, *d1, **d2):
+        self.next_update = 0
+    def update(self, _amount_read):
+        t = time.time()
+        if t < self.next_update: return
+        self.next_update = t + 0.31
+        write('%d %d\n', self._id, _amount_read)
+
+def main():
+    import signal
+    signal.signal(signal.SIGINT, lambda n, f: sys.exit(1))
+    cnt = 0
+    while True:
+        lines = _readlines(0)
+        if not lines: break
+        for line in lines:
+            cnt += 1
+            opts = URLGrabberOptions()
+            opts._id = cnt
+            for k in line.split(' '):
+                k, v = k.split('=', 1)
+                setattr(opts, k, _loads(v))
+            if opts.progress_obj:
+                opts.progress_obj = ProxyProgress()
+                opts.progress_obj._id = cnt
+            tm = time.time()
+            try:
+                fo = PyCurlFileObject(opts.url, opts.filename, opts)
+                fo._do_grab()
+                fo.fo.close()
+                size = fo._amount_read
+                dlsz = size - fo._reget_length
+                ug_err = 'OK'
+            except URLGrabError, e:
+                size = dlsz = 0
+                ug_err = '%d %s' % e.args
+            write('%d %d %d %.3f %s\n', opts._id, size, dlsz, time.time() - tm, ug_err)
+
+if __name__ == '__main__':
+    main()
diff --git a/setup.py b/setup.py
index d0b87b8..bfa4a18 100644
--- a/setup.py
+++ b/setup.py
@@ -15,8 +15,10 @@ url = _urlgrabber.__url__
 packages = ['urlgrabber']
 package_dir = {'urlgrabber':'urlgrabber'}
 scripts = ['scripts/urlgrabber']
-data_files = [('share/doc/' + name + '-' + version,
-               ['README','LICENSE', 'TODO', 'ChangeLog'])]
+data_files = [
+    ('share/doc/' + name + '-' + version, ['README','LICENSE', 'TODO', 'ChangeLog']),
+    ('libexec', ['scripts/urlgrabber-ext-down']),
+]
 options = { 'clean' : { 'all' : 1 } }
 classifiers = [
         'Development Status :: 4 - Beta',
diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py
index 38ae1f7..094be77 100644
--- a/urlgrabber/grabber.py
+++ b/urlgrabber/grabber.py
@@ -263,6 +263,33 @@ GENERAL ARGUMENTS (kwargs)
     What type of name to IP resolving to use, default is to do both IPV4 and
     IPV6.
 
+  async = (key, limit)
+
+    When this option is set, the urlgrab() is not processed immediately
+    but queued.  parallel_wait() then processes grabs in parallel, limiting
+    the numer of connections in each 'key' group to at most 'limit'.
+
+  max_connections
+
+    The global connection limit.
+
+  timedhosts
+
+    The filename of the host download statistics.  If defined, urlgrabber
+    will update the stats at the end of every download.  At the end of
+    parallel_wait(), the updated stats are saved.  If synchronous grabs
+    are used, you should call th_save().
+
+  default_speed, half_life
+
+    These options only affect the async mirror selection code.
+    The default_speed option sets the speed estimate for mirrors
+    we have never downloaded from, and defaults to 1 MBps.
+
+    The speed estimate also drifts exponentially from the speed
+    actually measured to the default speed, with default
+    period of 30 days.
+
 
 RETRY RELATED ARGUMENTS
 
@@ -343,6 +370,15 @@ RETRY RELATED ARGUMENTS
     but it cannot (without severe trickiness) prevent the exception
     from being raised.
 
+  failfunc = None
+
+    The callback that gets called when urlgrab request fails.
+    If defined, urlgrab() calls it instead of raising URLGrabError.
+    Callback syntax is identical to failure_callback.
+
+    Contrary to failure_callback, it's called only once.  It's primary
+    purpose is to use urlgrab() without a try/except block.
+
   interrupt_callback = None
 
     This callback is called if KeyboardInterrupt is received at any
@@ -444,7 +480,7 @@ import pycurl
 from ftplib import parse150
 from StringIO import StringIO
 from httplib import HTTPException
-import socket
+import socket, select, fcntl
 from byterange import range_tuple_normalize, range_tuple_to_header, RangeError
 
 try:
@@ -878,6 +914,7 @@ class URLGrabberOptions:
         self.retry = None
         self.retrycodes = [-1,2,4,5,6,7]
         self.checkfunc = None
+        self.failfunc = _do_raise
         self.copy_local = 0
         self.close_connection = 0
         self.range = None
@@ -886,6 +923,7 @@ class URLGrabberOptions:
         self.keepalive = 1
         self.proxies = None
         self.libproxy = False
+        self.proxy = None
         self.reget = None
         self.failure_callback = None
         self.interrupt_callback = None
@@ -913,6 +951,12 @@ class URLGrabberOptions:
         self.size = None # if we know how big the thing we're getting is going
                          # to be. this is ultimately a MAXIMUM size for the file
         self.max_header_size = 2097152 #2mb seems reasonable for maximum header size
+        self.async = None # blocking by default
+        self.mirror_group = None
+        self.max_connections = 5
+        self.timedhosts = None
+        self.half_life = 30*24*60*60 # 30 days
+        self.default_speed = 1e6 # 1 MBit
         
     def __repr__(self):
         return self.format()
@@ -932,6 +976,17 @@ class URLGrabberOptions:
         s = s + indent + '}'
         return s
 
+def _do_raise(obj):
+    raise obj.exception
+
+def _run_callback(cb, obj):
+    if not cb:
+        return
+    if callable(cb):
+        return cb(obj)
+    cb, arg, karg = cb
+    return cb(obj, *arg, **karg)
+
 class URLGrabber(object):
     """Provides easy opening of URLs with a variety of options.
     
@@ -977,10 +1032,9 @@ class URLGrabber(object):
             if DEBUG: DEBUG.info('exception: %s', exception)
             if callback:
                 if DEBUG: DEBUG.info('calling callback: %s', callback)
-                cb_func, cb_args, cb_kwargs = self._make_callback(callback)
                 obj = CallbackObject(exception=exception, url=args[0],
                                      tries=tries, retry=opts.retry)
-                cb_func(obj, *cb_args, **cb_kwargs)
+                _run_callback(callback, obj)
 
             if (opts.retry is None) or (tries == opts.retry):
                 if DEBUG: DEBUG.info('retries exceeded, re-raising')
@@ -1043,30 +1097,36 @@ class URLGrabber(object):
 
             elif not opts.range:
                 if not opts.checkfunc is None:
-                    cb_func, cb_args, cb_kwargs = \
-                       self._make_callback(opts.checkfunc)
-                    obj = CallbackObject()
-                    obj.filename = path
-                    obj.url = url
-                    apply(cb_func, (obj, )+cb_args, cb_kwargs)        
+                    obj = CallbackObject(filename=path, url=url)
+                    _run_callback(opts.checkfunc, obj)
                 return path
         
+        if opts.async:
+            opts.url = url
+            opts.filename = filename
+            opts.size = int(opts.size or 0)
+            _async_queue.append(opts)
+            return filename
+
         def retryfunc(opts, url, filename):
+            tm = time.time()
             fo = PyCurlFileObject(url, filename, opts)
             try:
                 fo._do_grab()
+                _TH.update(url, fo._amount_read - fo._reget_length, time.time() - tm, None)
                 if not opts.checkfunc is None:
-                    cb_func, cb_args, cb_kwargs = \
-                             self._make_callback(opts.checkfunc)
-                    obj = CallbackObject()
-                    obj.filename = filename
-                    obj.url = url
-                    apply(cb_func, (obj, )+cb_args, cb_kwargs)
+                    obj = CallbackObject(filename=filename, url=url)
+                    _run_callback(opts.checkfunc, obj)
             finally:
                 fo.close()
             return filename
         
-        return self._retry(opts, retryfunc, url, filename)
+        try:
+            return self._retry(opts, retryfunc, url, filename)
+        except URLGrabError, e:
+            _TH.update(url, 0, 0, e)
+            opts.exception = e
+            return _run_callback(opts.failfunc, opts)
     
     def urlread(self, url, limit=None, **kwargs):
         """read the url into a string, up to 'limit' bytes
@@ -1095,12 +1155,8 @@ class URLGrabber(object):
                 else: s = fo.read(limit)
 
                 if not opts.checkfunc is None:
-                    cb_func, cb_args, cb_kwargs = \
-                             self._make_callback(opts.checkfunc)
-                    obj = CallbackObject()
-                    obj.data = s
-                    obj.url = url
-                    apply(cb_func, (obj, )+cb_args, cb_kwargs)
+                    obj = CallbackObject(data=s, url=url)
+                    _run_callback(opts.checkfunc, obj)
             finally:
                 fo.close()
             return s
@@ -1115,6 +1171,7 @@ class URLGrabber(object):
         return s
         
     def _make_callback(self, callback_obj):
+        # not used, left for compatibility
         if callable(callback_obj):
             return callback_obj, (), {}
         else:
@@ -1346,14 +1403,8 @@ class PyCurlFileObject(object):
             return
         
         try:
-            e = None
             self.curl_obj.perform()
-        except pycurl.error, e: pass
-        self._do_perform_exc(e)
-
-    def _do_perform_exc(self, e):
-        # handle pycurl exception 'e'
-        if e:
+        except pycurl.error, e:
             # XXX - break some of these out a bit more clearly
             # to other URLGrabErrors from 
             # http://curl.haxx.se/libcurl/c/libcurl-errors.html
@@ -1607,7 +1658,22 @@ class PyCurlFileObject(object):
         _was_filename = False
         if type(self.filename) in types.StringTypes and self.filename:
             _was_filename = True
-            self._do_open_fo()
+            self._prog_reportname = str(self.filename)
+            self._prog_basename = os.path.basename(self.filename)
+            
+            if self.append: mode = 'ab'
+            else: mode = 'wb'
+
+            if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \
+                                 (self.filename, mode))
+            try:
+                self.fo = open(self.filename, mode)
+            except IOError, e:
+                err = URLGrabError(16, _(\
+                  'error opening local file from %s, IOError: %s') % (self.url, e))
+                err.url = self.url
+                raise err
+
         else:
             self._prog_reportname = 'MEMORY'
             self._prog_basename = 'MEMORY'
@@ -1627,7 +1693,29 @@ class PyCurlFileObject(object):
             raise e
     
         if _was_filename:
-            self._do_close_fo()
+            # close it up
+            self.fo.flush()
+            self.fo.close()
+
+            # Set the URL where we got it from:
+            if xattr is not None:
+                # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes
+                try:
+                    xattr.set(self.filename, 'user.xdg.origin.url', self.url)
+                except:
+                    pass # URL too long. = IOError ... ignore everything.
+
+            # set the time
+            mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME)
+            if mod_time != -1:
+                try:
+                    os.utime(self.filename, (mod_time, mod_time))
+                except OSError, e:
+                    err = URLGrabError(16, _(\
+                      'error setting timestamp on file %s from %s, OSError: %s') 
+                              % (self.filename, self.url, e))
+                    err.url = self.url
+                    raise err
             # re open it
             try:
                 self.fo = open(self.filename, 'r')
@@ -1643,47 +1731,6 @@ class PyCurlFileObject(object):
 
         self._complete = True
     
-    def _do_open_fo(self):
-        self._prog_reportname = str(self.filename)
-        self._prog_basename = os.path.basename(self.filename)
-        if self.append: mode = 'ab'
-        else: mode = 'wb'
-
-        if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \
-                             (self.filename, mode))
-        try:
-            self.fo = open(self.filename, mode)
-        except IOError, e:
-            err = URLGrabError(16, _(\
-              'error opening local file from %s, IOError: %s') % (self.url, e))
-            err.url = self.url
-            raise err
-
-    def _do_close_fo(self):
-        # close it up
-        self.fo.flush()
-        self.fo.close()
-
-        # Set the URL where we got it from:
-        if xattr is not None:
-            # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes
-            try:
-                xattr.set(self.filename, 'user.xdg.origin.url', self.url)
-            except:
-                pass # URL too long. = IOError ... ignore everything.
-
-        # set the time
-        mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME)
-        if mod_time != -1:
-            try:
-                os.utime(self.filename, (mod_time, mod_time))
-            except OSError, e:
-                err = URLGrabError(16, _(\
-                  'error setting timestamp on file %s from %s, OSError: %s') 
-                          % (self.filename, self.url, e))
-                err.url = self.url
-                raise err
-
     def _fill_buffer(self, amt=None):
         """fill the buffer to contain at least 'amt' bytes by reading
         from the underlying file object.  If amt is None, then it will
@@ -1858,6 +1905,425 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0,
 
         
 #####################################################################
+#  Serializer + parser: A replacement of the rather bulky Json code.
+#
+# - handles basic python literals, lists and tuples.
+# - serialized strings never contain ' ' or '\n'
+#
+#####################################################################
+
+_quoter_map = {}
+for c in '%[(,)] \n':
+    _quoter_map[c] = '%%%02x' % ord(c)
+del c
+
+def _dumps(v):
+    if v is None: return 'None'
+    if v is True: return 'True'
+    if v is False: return 'False'
+    if type(v) in (int, long, float):
+        return str(v)
+    if type(v) == unicode:
+        v = v.encode('UTF8')
+    if type(v) == str:
+        def quoter(c): return _quoter_map.get(c, c)
+        return "'%s'" % ''.join(map(quoter, v))
+    if type(v) == tuple:
+        return "(%s)" % ','.join(map(_dumps, v))
+    if type(v) == list:
+        return "[%s]" % ','.join(map(_dumps, v))
+    raise TypeError, 'Can\'t serialize %s' % v
+
+def _loads(s):
+    def decode(v):
+        if v == 'None': return None
+        if v == 'True': return True
+        if v == 'False': return False
+        try: return int(v)
+        except ValueError: pass
+        try: return float(v)
+        except ValueError: pass
+        if len(v) >= 2 and v[0] == v[-1] == "'":
+            ret = []; i = 1
+            while True:
+                j = v.find('%', i)
+                ret.append(v[i:j]) # skips the final "'"
+                if j == -1: break
+                ret.append(chr(int(v[j + 1:j + 3], 16)))
+                i = j + 3
+            v = ''.join(ret)
+        return v
+    stk = None
+    l = []
+    i = j = 0
+    while True:
+        if j == len(s) or s[j] in ',)]':
+            if j > i:
+                l.append(decode(s[i:j]))
+            if j == len(s): break
+            if s[j] in ')]':
+                if s[j] == ')':
+                    l = tuple(l)
+                stk[0].append(l)
+                l, stk = stk
+            i = j = j + 1
+        elif s[j] in '[(':
+            stk = l, stk
+            l = []
+            i = j = j + 1
+        else:
+            j += 1 # safe because '[(,)]' are quoted
+    if stk: raise ValueError
+    if len(l) == 1: l = l[0]
+    return l
+
+
+#####################################################################
+#  External downloader process
+#####################################################################
+
+def _readlines(fd):
+    buf = os.read(fd, 4096)
+    if not buf: return None
+    # whole lines only, no buffering
+    while buf[-1] != '\n':
+        buf += os.read(fd, 4096)
+    return buf[:-1].split('\n')
+
+import subprocess
+
+class _ExternalDownloader:
+    def __init__(self):
+        self.popen = subprocess.Popen(
+            '/usr/libexec/urlgrabber-ext-down',
+            stdin = subprocess.PIPE,
+            stdout = subprocess.PIPE,
+        )
+        self.stdin  = self.popen.stdin.fileno()
+        self.stdout = self.popen.stdout.fileno()
+        self.running = {}
+        self.cnt = 0
+
+    # list of options we pass to downloader
+    _options = (
+        'url', 'filename',
+        'timeout', 'close_connection', 'keepalive',
+        'throttle', 'bandwidth', 'range', 'reget',
+        'user_agent', 'http_headers', 'ftp_headers',
+        'proxy', 'prefix', 'username', 'password',
+        'ssl_ca_cert',
+        'ssl_cert', 'ssl_cert_type',
+        'ssl_key', 'ssl_key_type',
+        'ssl_key_pass',
+        'ssl_verify_peer', 'ssl_verify_host',
+        'size', 'max_header_size', 'ip_resolve',
+    )
+
+    def start(self, opts):
+        arg = []
+        for k in self._options:
+            v = getattr(opts, k)
+            if v is None: continue
+            arg.append('%s=%s' % (k, _dumps(v)))
+        if opts.progress_obj:
+            arg.append('progress_obj=True')
+        arg = ' '.join(arg)
+        if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url)
+
+        self.cnt += 1
+        self.running[self.cnt] = opts
+        os.write(self.stdin, arg +'\n')
+
+    def perform(self):
+        ret = []
+        lines = _readlines(self.stdout)
+        if not lines:
+            if DEBUG: DEBUG.info('downloader died')
+            raise KeyboardInterrupt
+        for line in lines:
+            # parse downloader output
+            line = line.split(' ', 5)
+            _id, size = map(int, line[:2])
+            if len(line) == 2:
+                opts = self.running[_id]
+                m = opts.progress_obj
+                if m:
+                    if not m.last_update_time:
+                        m.start(text = opts.text)
+                    m.update(size)
+                continue
+            # job done
+            opts = self.running.pop(_id)
+            if line[4] == 'OK':
+                ug_err = None
+                if DEBUG: DEBUG.info('success')
+            else:
+                ug_err = URLGrabError(int(line[4]), line[5])
+                if DEBUG: DEBUG.info('failure: %s', err)
+            _TH.update(opts.url, int(line[2]), float(line[3]), ug_err)
+            ret.append((opts, size, ug_err))
+        return ret
+
+    def abort(self):
+        self.popen.stdin.close()
+        self.popen.stdout.close()
+        self.popen.wait()
+
+class _ExternalDownloaderPool:
+    def __init__(self):
+        self.epoll = select.epoll()
+        self.running = {}
+        self.cache = {}
+
+    def start(self, opts):
+        host = urlparse.urlsplit(opts.url).netloc
+        dl = self.cache.pop(host, None)
+        if not dl:
+            dl = _ExternalDownloader()
+            fl = fcntl.fcntl(dl.stdin, fcntl.F_GETFD)
+            fcntl.fcntl(dl.stdin, fcntl.F_SETFD, fl | fcntl.FD_CLOEXEC)
+        self.epoll.register(dl.stdout, select.EPOLLIN)
+        self.running[dl.stdout] = dl
+        dl.start(opts)
+
+    def perform(self):
+        ret = []
+        for fd, event in self.epoll.poll():
+            assert event & select.EPOLLIN
+            done = self.running[fd].perform()
+            if not done: continue
+            assert len(done) == 1
+            ret.extend(done)
+
+            # dl finished, move it to the cache
+            host = urlparse.urlsplit(done[0][0].url).netloc
+            if host in self.cache: self.cache[host].abort()
+            self.epoll.unregister(fd)
+            self.cache[host] = self.running.pop(fd)
+        return ret
+
+    def abort(self):
+        for dl in self.running.values():
+            self.epoll.unregister(dl.stdout)
+            dl.abort()
+        for dl in self.cache.values():
+            dl.abort()
+
+
+#####################################################################
+#  High level async API
+#####################################################################
+
+_async_queue = []
+
+def parallel_wait(meter = 'text'):
+    '''Process queued requests in parallel.
+    '''
+
+    if meter:
+        count = total = 0
+        for opts in _async_queue:
+            count += 1
+            total += opts.size
+        if meter == 'text':
+            from progress import TextMultiFileMeter
+            meter = TextMultiFileMeter()
+        meter.start(count, total)
+
+    dl = _ExternalDownloaderPool()
+    host_con = {} # current host connection counts
+
+    def start(opts, tries):
+        key, limit = opts.async
+        host_con[key] = host_con.get(key, 0) + 1
+        opts.tries = tries
+        opts.progress_obj = meter and meter.newMeter()
+        if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url)
+        dl.start(opts)
+
+    def perform():
+        for opts, size, ug_err in dl.perform():
+            key, limit = opts.async
+            host_con[key] -= 1
+            if meter:
+                m = opts.progress_obj
+                m.basename = os.path.basename(opts.filename)
+                if ug_err:
+                    m.failure(ug_err.args[1])
+                else:
+                    # file size might have changed
+                    meter.re.total += size - opts.size
+                    m.end(size)
+                meter.removeMeter(m)
+
+            if ug_err is None:
+                if opts.checkfunc:
+                    try: _run_callback(opts.checkfunc, opts)
+                    except URLGrabError, ug_err: pass
+                if ug_err is None:
+                    continue
+
+            retry = opts.retry or 0
+            if opts.failure_callback:
+                opts.exception = ug_err
+                try: _run_callback(opts.failure_callback, opts)
+                except URLGrabError, ug_err:
+                    retry = 0 # no retries
+            if opts.tries < retry and ug_err.args[0] in opts.retrycodes:
+                start(opts, opts.tries + 1) # simple retry
+                continue
+
+            if opts.mirror_group:
+                mg, failed = opts.mirror_group
+                opts.mirror = key
+                opts.exception = ug_err
+                action = _run_callback(mg.failure_callback, opts)
+                if not (action and action.get('fail')):
+                    # mask this mirror and retry
+                    failed.add(key)
+                    _async_queue.append(opts)
+                    continue
+
+            # urlgrab failed
+            opts.exception = ug_err
+            _run_callback(opts.failfunc, opts)
+
+    try:
+        idx = 0
+        while True:
+            if idx >= len(_async_queue):
+                # the queue is empty
+                if not dl.running: break
+                # pending dl may extend it
+                perform()
+                continue
+
+            # handle next request
+            opts = _async_queue[idx]
+            idx += 1
+
+            # check global limit
+            while len(dl.running) >= opts.max_connections:
+                perform()
+
+            if opts.mirror_group:
+                mg, failed = opts.mirror_group
+
+                # find the best mirror
+                best = None
+                for mirror in mg.mirrors:
+                    key = mirror['mirror']
+                    if key in failed: continue
+
+                    # estimate mirror speed
+                    speed = _TH.estimate(key)
+                    speed /= 1 + host_con.get(key, 0)
+                    if best is None or speed > best_speed:
+                        best = mirror
+                        best_speed = speed
+
+                if best is None:
+                    opts.exception = URLGrabError(256, _('No more mirrors to try.'))
+                    _run_callback(opts.failfunc, opts)
+                    continue
+
+                # update the current mirror and limit
+                key = best['mirror']
+                limit = best.get('kwargs', {}).get('max_connections', 3)
+                opts.async = key, limit
+
+                # update URL and proxy
+                url = mg._join_url(key, opts.relative_url)
+                url, parts = opts.urlparser.parse(url, opts)
+                opts.find_proxy(url, parts[0])
+                opts.url = url
+
+            # check host limit, then start
+            key, limit = opts.async
+            while host_con.get(key, 0) >= limit:
+                perform()
+            start(opts, 1)
+    except IOError, e:
+        if e.errno != 4: raise
+        raise KeyboardInterrupt
+
+    finally:
+        dl.abort()
+        if meter: meter.end()
+        del _async_queue[:]
+        _TH.save()
+
+
+#####################################################################
+#  Host bandwidth estimation
+#####################################################################
+
+class _TH:
+    hosts = {}
+    dirty = None
+
+    @staticmethod
+    def load():
+        filename = default_grabber.opts.timedhosts
+        if filename and _TH.dirty is None:
+            try:
+                for line in open(filename):
+                    host, speed, fail, ts = line.split()
+                    _TH.hosts[host] = int(speed), int(fail), int(ts)
+            except IOError: pass
+            _TH.dirty = False
+
+    @staticmethod
+    def save():
+        filename = default_grabber.opts.timedhosts
+        if filename and _TH.dirty is True:
+            tmp = '%s.%d' % (filename, os.getpid())
+            try:
+                f = open(tmp, 'w')
+                for host in _TH.hosts:
+                    f.write(host + ' %d %d %d\n' % _TH.hosts[host])
+                f.close()
+                os.rename(tmp, filename)
+            except IOError: pass
+            _TH.dirty = False
+
+    @staticmethod
+    def update(url, dl_size, dl_time, ug_err):
+        _TH.load()
+        host = urlparse.urlsplit(url).netloc
+        speed, fail, ts = _TH.hosts.get(host) or (0, 0, 0)
+        now = time.time()
+
+        if ug_err is None:
+            # k1: the older, the less useful
+            # k2: if it was <1MiB, don't trust it much
+            # speeds vary, use 10:1 smoothing
+            k1 = 2**((ts - now) / default_grabber.opts.half_life)
+            k2 = min(dl_size / 1e6, 1.0) / 10
+            speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2)
+            fail = 0
+        elif getattr(ug_err, 'code', None) == 404:
+            fail = 0 # alive, at least
+        else:
+            fail += 1 # seems dead
+
+        _TH.hosts[host] = speed, fail, now
+        _TH.dirty = True
+
+    @staticmethod
+    def estimate(url):
+        _TH.load()
+        host = urlparse.urlsplit(url).netloc
+        default_speed = default_grabber.opts.default_speed
+        try: speed, fail, ts = _TH.hosts[host]
+        except KeyError: return default_speed
+
+        speed *= 2**-fail
+        k = 2**((ts - time.time()) / default_grabber.opts.half_life)
+        speed = k * speed + (1 - k) * default_speed
+        return speed
+
+#####################################################################
 #  TESTING
 def _main_test():
     try: url, filename = sys.argv[1:3]
diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py
index 8731aed..d699b61 100644
--- a/urlgrabber/mirror.py
+++ b/urlgrabber/mirror.py
@@ -76,6 +76,9 @@ CUSTOMIZATION
        'grabber' is omitted, the default grabber will be used.  If
        kwargs are omitted, then (duh) they will not be used.
 
+       kwarg 'max_connections' is used to store the max connection
+       limit of this mirror.
+
     3) Pass keyword arguments when instantiating the mirror group.
        See, for example, the failure_callback argument.
 
@@ -91,6 +94,7 @@ import random
 import thread  # needed for locking to make this threadsafe
 
 from grabber import URLGrabError, CallbackObject, DEBUG, _to_utf8
+from grabber import _run_callback, _do_raise, _async_queue
 
 def _(st): 
     return st
@@ -254,7 +258,7 @@ class MirrorGroup:
     # if these values are found in **kwargs passed to one of the urlXXX
     # methods, they will be stripped before getting passed on to the
     # grabber
-    options = ['default_action', 'failure_callback']
+    options = ['default_action', 'failure_callback', 'failfunc']
     
     def _process_kwargs(self, kwargs):
         self.failure_callback = kwargs.get('failure_callback')
@@ -403,10 +407,25 @@ class MirrorGroup:
                 self._failure(gr, obj)
 
     def urlgrab(self, url, filename=None, **kwargs):
+        if kwargs.get('async'):
+            opts = self.grabber.opts.derive(**kwargs)
+            opts.mirror_group = self, set()
+            opts.relative_url = _to_utf8(url)
+
+            opts.url = 'http://tbd'
+            opts.filename = filename
+            opts.size = int(opts.size or 0)
+            _async_queue.append(opts)
+            return filename
+
         kw = dict(kwargs)
         kw['filename'] = filename
         func = 'urlgrab'
-        return self._mirror_try(func, url, kw)
+        try:
+            return self._mirror_try(func, url, kw)
+        except URLGrabError, e:
+            obj = CallbackObject(url=url, filename=filename, exception=e, **kwargs)
+            return _run_callback(kwargs.get('failfunc', _do_raise), obj)
     
     def urlopen(self, url, **kwargs):
         kw = dict(kwargs)
diff --git a/urlgrabber/progress.py b/urlgrabber/progress.py
index 3d7e99a..4c126c5 100644
--- a/urlgrabber/progress.py
+++ b/urlgrabber/progress.py
@@ -576,7 +576,6 @@ class TextMultiFileMeter(MultiFileMeter):
             self.fo.write(out)
         finally:
             self._lock.release()
-        self._do_update_meter(meter, now)
 
     def _do_failure_meter(self, meter, message, now):
         self._lock.acquire()
@@ -599,15 +598,6 @@ class TextMultiFileMeter(MultiFileMeter):
             pass
         finally:
             self._lock.release()
-
-    def _do_end(self, now):
-        self._do_update_meter(None, now)
-        self._lock.acquire()
-        try:
-            self.fo.write('\n')
-            self.fo.flush()
-        finally:
-            self._lock.release()
         
 ######################################################################
 # support classes and functions