From 9f5eee4fa2ee9006f284b90ce4f2843d5227ccde Mon Sep 17 00:00:00 2001 From: Zdeněk Pavlas Date: Jul 20 2012 11:30:45 +0000 Subject: Update to latest head --- diff --git a/file-url-profiling.patch b/file-url-profiling.patch deleted file mode 100644 index 610f5d3..0000000 --- a/file-url-profiling.patch +++ /dev/null @@ -1,97 +0,0 @@ -commit 7c74f526dd761b647d6bb6a7b7d6c285fe78bdb8 -Author: Zdeněk Pavlas -Date: Fri May 18 15:38:44 2012 +0200 - - timedhosts: fix file:// profiling. BZ 822632. - - - Do not profile absolute file:// URLs. - - Give a hint to _TH.update() which baseurl was used - so we may profile file:// mirrors, too. - - Strip username and password from stored hostnames. - -diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py -index 094be77..be85f92 100644 ---- a/urlgrabber/grabber.py -+++ b/urlgrabber/grabber.py -@@ -2060,7 +2060,7 @@ class _ExternalDownloader: - else: - ug_err = URLGrabError(int(line[4]), line[5]) - if DEBUG: DEBUG.info('failure: %s', err) -- _TH.update(opts.url, int(line[2]), float(line[3]), ug_err) -+ _TH.update(opts.url, int(line[2]), float(line[3]), ug_err, opts.async[0]) - ret.append((opts, size, ug_err)) - return ret - -@@ -2268,7 +2268,7 @@ class _TH: - if filename and _TH.dirty is None: - try: - for line in open(filename): -- host, speed, fail, ts = line.split() -+ host, speed, fail, ts = line.split(' ', 3) - _TH.hosts[host] = int(speed), int(fail), int(ts) - except IOError: pass - _TH.dirty = False -@@ -2288,9 +2288,14 @@ class _TH: - _TH.dirty = False - - @staticmethod -- def update(url, dl_size, dl_time, ug_err): -+ def update(url, dl_size, dl_time, ug_err, baseurl=None): - _TH.load() -- host = urlparse.urlsplit(url).netloc -+ -+ # Use hostname from URL. If it's a file:// URL, use baseurl. -+ # If no baseurl, do not update timedhosts. -+ host = urlparse.urlsplit(url).netloc.split('@')[-1] or baseurl -+ if not host: return -+ - speed, fail, ts = _TH.hosts.get(host) or (0, 0, 0) - now = time.time() - -@@ -2311,9 +2316,12 @@ class _TH: - _TH.dirty = True - - @staticmethod -- def estimate(url): -+ def estimate(baseurl): - _TH.load() -- host = urlparse.urlsplit(url).netloc -+ -+ # Use just the hostname, unless it's a file:// baseurl. -+ host = urlparse.urlsplit(baseurl).netloc.split('@')[-1] or baseurl -+ - default_speed = default_grabber.opts.default_speed - try: speed, fail, ts = _TH.hosts[host] - except KeyError: return default_speed -commit fa6a17c29e9dea3ccd2d384039b305f027a5b75e -Author: Zdeněk Pavlas -Date: Mon May 21 09:06:13 2012 +0200 - - timedhosts: sanity check on dl_time - - - handle the dl_time <= 0 case - - - relative validity of calculated speed now depends - on dl_time instead of dl_size. (that's where the - random error is) - -diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py -index be85f92..73e14aa 100644 ---- a/urlgrabber/grabber.py -+++ b/urlgrabber/grabber.py -@@ -2301,11 +2301,12 @@ class _TH: - - if ug_err is None: - # k1: the older, the less useful -- # k2: if it was <1MiB, don't trust it much -+ # k2: <500ms readings are less reliable - # speeds vary, use 10:1 smoothing - k1 = 2**((ts - now) / default_grabber.opts.half_life) -- k2 = min(dl_size / 1e6, 1.0) / 10 -- speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2) -+ k2 = min(dl_time / .500, 1.0) / 10 -+ if k2 > 0: -+ speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2) - fail = 0 - elif getattr(ug_err, 'code', None) == 404: - fail = 0 # alive, at least diff --git a/multi-downloader.patch b/multi-downloader.patch deleted file mode 100644 index 7aeec78..0000000 --- a/multi-downloader.patch +++ /dev/null @@ -1,904 +0,0 @@ -diff --git a/scripts/urlgrabber-ext-down b/scripts/urlgrabber-ext-down -new file mode 100755 -index 0000000..c37e6a8 ---- /dev/null -+++ b/scripts/urlgrabber-ext-down -@@ -0,0 +1,55 @@ -+#! /usr/bin/python -+# A very simple external downloader -+ -+import time, os, errno, sys -+from urlgrabber.grabber import \ -+ _readlines, URLGrabberOptions, _loads, \ -+ PyCurlFileObject, URLGrabError -+ -+def write(fmt, *arg): -+ try: os.write(1, fmt % arg) -+ except OSError, e: -+ if e.arg[0] != errno.EPIPE: raise -+ sys.exit(1) -+ -+class ProxyProgress: -+ def start(self, *d1, **d2): -+ self.next_update = 0 -+ def update(self, _amount_read): -+ t = time.time() -+ if t < self.next_update: return -+ self.next_update = t + 0.31 -+ write('%d %d\n', self._id, _amount_read) -+ -+def main(): -+ import signal -+ signal.signal(signal.SIGINT, lambda n, f: sys.exit(1)) -+ cnt = 0 -+ while True: -+ lines = _readlines(0) -+ if not lines: break -+ for line in lines: -+ cnt += 1 -+ opts = URLGrabberOptions() -+ opts._id = cnt -+ for k in line.split(' '): -+ k, v = k.split('=', 1) -+ setattr(opts, k, _loads(v)) -+ if opts.progress_obj: -+ opts.progress_obj = ProxyProgress() -+ opts.progress_obj._id = cnt -+ tm = time.time() -+ try: -+ fo = PyCurlFileObject(opts.url, opts.filename, opts) -+ fo._do_grab() -+ fo.fo.close() -+ size = fo._amount_read -+ dlsz = size - fo._reget_length -+ ug_err = 'OK' -+ except URLGrabError, e: -+ size = dlsz = 0 -+ ug_err = '%d %s' % e.args -+ write('%d %d %d %.3f %s\n', opts._id, size, dlsz, time.time() - tm, ug_err) -+ -+if __name__ == '__main__': -+ main() -diff --git a/setup.py b/setup.py -index d0b87b8..bfa4a18 100644 ---- a/setup.py -+++ b/setup.py -@@ -15,8 +15,10 @@ url = _urlgrabber.__url__ - packages = ['urlgrabber'] - package_dir = {'urlgrabber':'urlgrabber'} - scripts = ['scripts/urlgrabber'] --data_files = [('share/doc/' + name + '-' + version, -- ['README','LICENSE', 'TODO', 'ChangeLog'])] -+data_files = [ -+ ('share/doc/' + name + '-' + version, ['README','LICENSE', 'TODO', 'ChangeLog']), -+ ('libexec', ['scripts/urlgrabber-ext-down']), -+] - options = { 'clean' : { 'all' : 1 } } - classifiers = [ - 'Development Status :: 4 - Beta', -diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py -index 38ae1f7..094be77 100644 ---- a/urlgrabber/grabber.py -+++ b/urlgrabber/grabber.py -@@ -263,6 +263,33 @@ GENERAL ARGUMENTS (kwargs) - What type of name to IP resolving to use, default is to do both IPV4 and - IPV6. - -+ async = (key, limit) -+ -+ When this option is set, the urlgrab() is not processed immediately -+ but queued. parallel_wait() then processes grabs in parallel, limiting -+ the numer of connections in each 'key' group to at most 'limit'. -+ -+ max_connections -+ -+ The global connection limit. -+ -+ timedhosts -+ -+ The filename of the host download statistics. If defined, urlgrabber -+ will update the stats at the end of every download. At the end of -+ parallel_wait(), the updated stats are saved. If synchronous grabs -+ are used, you should call th_save(). -+ -+ default_speed, half_life -+ -+ These options only affect the async mirror selection code. -+ The default_speed option sets the speed estimate for mirrors -+ we have never downloaded from, and defaults to 1 MBps. -+ -+ The speed estimate also drifts exponentially from the speed -+ actually measured to the default speed, with default -+ period of 30 days. -+ - - RETRY RELATED ARGUMENTS - -@@ -343,6 +370,15 @@ RETRY RELATED ARGUMENTS - but it cannot (without severe trickiness) prevent the exception - from being raised. - -+ failfunc = None -+ -+ The callback that gets called when urlgrab request fails. -+ If defined, urlgrab() calls it instead of raising URLGrabError. -+ Callback syntax is identical to failure_callback. -+ -+ Contrary to failure_callback, it's called only once. It's primary -+ purpose is to use urlgrab() without a try/except block. -+ - interrupt_callback = None - - This callback is called if KeyboardInterrupt is received at any -@@ -444,7 +480,7 @@ import pycurl - from ftplib import parse150 - from StringIO import StringIO - from httplib import HTTPException --import socket -+import socket, select, fcntl - from byterange import range_tuple_normalize, range_tuple_to_header, RangeError - - try: -@@ -878,6 +914,7 @@ class URLGrabberOptions: - self.retry = None - self.retrycodes = [-1,2,4,5,6,7] - self.checkfunc = None -+ self.failfunc = _do_raise - self.copy_local = 0 - self.close_connection = 0 - self.range = None -@@ -886,6 +923,7 @@ class URLGrabberOptions: - self.keepalive = 1 - self.proxies = None - self.libproxy = False -+ self.proxy = None - self.reget = None - self.failure_callback = None - self.interrupt_callback = None -@@ -913,6 +951,12 @@ class URLGrabberOptions: - self.size = None # if we know how big the thing we're getting is going - # to be. this is ultimately a MAXIMUM size for the file - self.max_header_size = 2097152 #2mb seems reasonable for maximum header size -+ self.async = None # blocking by default -+ self.mirror_group = None -+ self.max_connections = 5 -+ self.timedhosts = None -+ self.half_life = 30*24*60*60 # 30 days -+ self.default_speed = 1e6 # 1 MBit - - def __repr__(self): - return self.format() -@@ -932,6 +976,17 @@ class URLGrabberOptions: - s = s + indent + '}' - return s - -+def _do_raise(obj): -+ raise obj.exception -+ -+def _run_callback(cb, obj): -+ if not cb: -+ return -+ if callable(cb): -+ return cb(obj) -+ cb, arg, karg = cb -+ return cb(obj, *arg, **karg) -+ - class URLGrabber(object): - """Provides easy opening of URLs with a variety of options. - -@@ -977,10 +1032,9 @@ class URLGrabber(object): - if DEBUG: DEBUG.info('exception: %s', exception) - if callback: - if DEBUG: DEBUG.info('calling callback: %s', callback) -- cb_func, cb_args, cb_kwargs = self._make_callback(callback) - obj = CallbackObject(exception=exception, url=args[0], - tries=tries, retry=opts.retry) -- cb_func(obj, *cb_args, **cb_kwargs) -+ _run_callback(callback, obj) - - if (opts.retry is None) or (tries == opts.retry): - if DEBUG: DEBUG.info('retries exceeded, re-raising') -@@ -1043,30 +1097,36 @@ class URLGrabber(object): - - elif not opts.range: - if not opts.checkfunc is None: -- cb_func, cb_args, cb_kwargs = \ -- self._make_callback(opts.checkfunc) -- obj = CallbackObject() -- obj.filename = path -- obj.url = url -- apply(cb_func, (obj, )+cb_args, cb_kwargs) -+ obj = CallbackObject(filename=path, url=url) -+ _run_callback(opts.checkfunc, obj) - return path - -+ if opts.async: -+ opts.url = url -+ opts.filename = filename -+ opts.size = int(opts.size or 0) -+ _async_queue.append(opts) -+ return filename -+ - def retryfunc(opts, url, filename): -+ tm = time.time() - fo = PyCurlFileObject(url, filename, opts) - try: - fo._do_grab() -+ _TH.update(url, fo._amount_read - fo._reget_length, time.time() - tm, None) - if not opts.checkfunc is None: -- cb_func, cb_args, cb_kwargs = \ -- self._make_callback(opts.checkfunc) -- obj = CallbackObject() -- obj.filename = filename -- obj.url = url -- apply(cb_func, (obj, )+cb_args, cb_kwargs) -+ obj = CallbackObject(filename=filename, url=url) -+ _run_callback(opts.checkfunc, obj) - finally: - fo.close() - return filename - -- return self._retry(opts, retryfunc, url, filename) -+ try: -+ return self._retry(opts, retryfunc, url, filename) -+ except URLGrabError, e: -+ _TH.update(url, 0, 0, e) -+ opts.exception = e -+ return _run_callback(opts.failfunc, opts) - - def urlread(self, url, limit=None, **kwargs): - """read the url into a string, up to 'limit' bytes -@@ -1095,12 +1155,8 @@ class URLGrabber(object): - else: s = fo.read(limit) - - if not opts.checkfunc is None: -- cb_func, cb_args, cb_kwargs = \ -- self._make_callback(opts.checkfunc) -- obj = CallbackObject() -- obj.data = s -- obj.url = url -- apply(cb_func, (obj, )+cb_args, cb_kwargs) -+ obj = CallbackObject(data=s, url=url) -+ _run_callback(opts.checkfunc, obj) - finally: - fo.close() - return s -@@ -1115,6 +1171,7 @@ class URLGrabber(object): - return s - - def _make_callback(self, callback_obj): -+ # not used, left for compatibility - if callable(callback_obj): - return callback_obj, (), {} - else: -@@ -1346,14 +1403,8 @@ class PyCurlFileObject(object): - return - - try: -- e = None - self.curl_obj.perform() -- except pycurl.error, e: pass -- self._do_perform_exc(e) -- -- def _do_perform_exc(self, e): -- # handle pycurl exception 'e' -- if e: -+ except pycurl.error, e: - # XXX - break some of these out a bit more clearly - # to other URLGrabErrors from - # http://curl.haxx.se/libcurl/c/libcurl-errors.html -@@ -1607,7 +1658,22 @@ class PyCurlFileObject(object): - _was_filename = False - if type(self.filename) in types.StringTypes and self.filename: - _was_filename = True -- self._do_open_fo() -+ self._prog_reportname = str(self.filename) -+ self._prog_basename = os.path.basename(self.filename) -+ -+ if self.append: mode = 'ab' -+ else: mode = 'wb' -+ -+ if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \ -+ (self.filename, mode)) -+ try: -+ self.fo = open(self.filename, mode) -+ except IOError, e: -+ err = URLGrabError(16, _(\ -+ 'error opening local file from %s, IOError: %s') % (self.url, e)) -+ err.url = self.url -+ raise err -+ - else: - self._prog_reportname = 'MEMORY' - self._prog_basename = 'MEMORY' -@@ -1627,7 +1693,29 @@ class PyCurlFileObject(object): - raise e - - if _was_filename: -- self._do_close_fo() -+ # close it up -+ self.fo.flush() -+ self.fo.close() -+ -+ # Set the URL where we got it from: -+ if xattr is not None: -+ # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes -+ try: -+ xattr.set(self.filename, 'user.xdg.origin.url', self.url) -+ except: -+ pass # URL too long. = IOError ... ignore everything. -+ -+ # set the time -+ mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME) -+ if mod_time != -1: -+ try: -+ os.utime(self.filename, (mod_time, mod_time)) -+ except OSError, e: -+ err = URLGrabError(16, _(\ -+ 'error setting timestamp on file %s from %s, OSError: %s') -+ % (self.filename, self.url, e)) -+ err.url = self.url -+ raise err - # re open it - try: - self.fo = open(self.filename, 'r') -@@ -1643,47 +1731,6 @@ class PyCurlFileObject(object): - - self._complete = True - -- def _do_open_fo(self): -- self._prog_reportname = str(self.filename) -- self._prog_basename = os.path.basename(self.filename) -- if self.append: mode = 'ab' -- else: mode = 'wb' -- -- if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \ -- (self.filename, mode)) -- try: -- self.fo = open(self.filename, mode) -- except IOError, e: -- err = URLGrabError(16, _(\ -- 'error opening local file from %s, IOError: %s') % (self.url, e)) -- err.url = self.url -- raise err -- -- def _do_close_fo(self): -- # close it up -- self.fo.flush() -- self.fo.close() -- -- # Set the URL where we got it from: -- if xattr is not None: -- # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes -- try: -- xattr.set(self.filename, 'user.xdg.origin.url', self.url) -- except: -- pass # URL too long. = IOError ... ignore everything. -- -- # set the time -- mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME) -- if mod_time != -1: -- try: -- os.utime(self.filename, (mod_time, mod_time)) -- except OSError, e: -- err = URLGrabError(16, _(\ -- 'error setting timestamp on file %s from %s, OSError: %s') -- % (self.filename, self.url, e)) -- err.url = self.url -- raise err -- - def _fill_buffer(self, amt=None): - """fill the buffer to contain at least 'amt' bytes by reading - from the underlying file object. If amt is None, then it will -@@ -1858,6 +1905,425 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0, - - - ##################################################################### -+# Serializer + parser: A replacement of the rather bulky Json code. -+# -+# - handles basic python literals, lists and tuples. -+# - serialized strings never contain ' ' or '\n' -+# -+##################################################################### -+ -+_quoter_map = {} -+for c in '%[(,)] \n': -+ _quoter_map[c] = '%%%02x' % ord(c) -+del c -+ -+def _dumps(v): -+ if v is None: return 'None' -+ if v is True: return 'True' -+ if v is False: return 'False' -+ if type(v) in (int, long, float): -+ return str(v) -+ if type(v) == unicode: -+ v = v.encode('UTF8') -+ if type(v) == str: -+ def quoter(c): return _quoter_map.get(c, c) -+ return "'%s'" % ''.join(map(quoter, v)) -+ if type(v) == tuple: -+ return "(%s)" % ','.join(map(_dumps, v)) -+ if type(v) == list: -+ return "[%s]" % ','.join(map(_dumps, v)) -+ raise TypeError, 'Can\'t serialize %s' % v -+ -+def _loads(s): -+ def decode(v): -+ if v == 'None': return None -+ if v == 'True': return True -+ if v == 'False': return False -+ try: return int(v) -+ except ValueError: pass -+ try: return float(v) -+ except ValueError: pass -+ if len(v) >= 2 and v[0] == v[-1] == "'": -+ ret = []; i = 1 -+ while True: -+ j = v.find('%', i) -+ ret.append(v[i:j]) # skips the final "'" -+ if j == -1: break -+ ret.append(chr(int(v[j + 1:j + 3], 16))) -+ i = j + 3 -+ v = ''.join(ret) -+ return v -+ stk = None -+ l = [] -+ i = j = 0 -+ while True: -+ if j == len(s) or s[j] in ',)]': -+ if j > i: -+ l.append(decode(s[i:j])) -+ if j == len(s): break -+ if s[j] in ')]': -+ if s[j] == ')': -+ l = tuple(l) -+ stk[0].append(l) -+ l, stk = stk -+ i = j = j + 1 -+ elif s[j] in '[(': -+ stk = l, stk -+ l = [] -+ i = j = j + 1 -+ else: -+ j += 1 # safe because '[(,)]' are quoted -+ if stk: raise ValueError -+ if len(l) == 1: l = l[0] -+ return l -+ -+ -+##################################################################### -+# External downloader process -+##################################################################### -+ -+def _readlines(fd): -+ buf = os.read(fd, 4096) -+ if not buf: return None -+ # whole lines only, no buffering -+ while buf[-1] != '\n': -+ buf += os.read(fd, 4096) -+ return buf[:-1].split('\n') -+ -+import subprocess -+ -+class _ExternalDownloader: -+ def __init__(self): -+ self.popen = subprocess.Popen( -+ '/usr/libexec/urlgrabber-ext-down', -+ stdin = subprocess.PIPE, -+ stdout = subprocess.PIPE, -+ ) -+ self.stdin = self.popen.stdin.fileno() -+ self.stdout = self.popen.stdout.fileno() -+ self.running = {} -+ self.cnt = 0 -+ -+ # list of options we pass to downloader -+ _options = ( -+ 'url', 'filename', -+ 'timeout', 'close_connection', 'keepalive', -+ 'throttle', 'bandwidth', 'range', 'reget', -+ 'user_agent', 'http_headers', 'ftp_headers', -+ 'proxy', 'prefix', 'username', 'password', -+ 'ssl_ca_cert', -+ 'ssl_cert', 'ssl_cert_type', -+ 'ssl_key', 'ssl_key_type', -+ 'ssl_key_pass', -+ 'ssl_verify_peer', 'ssl_verify_host', -+ 'size', 'max_header_size', 'ip_resolve', -+ ) -+ -+ def start(self, opts): -+ arg = [] -+ for k in self._options: -+ v = getattr(opts, k) -+ if v is None: continue -+ arg.append('%s=%s' % (k, _dumps(v))) -+ if opts.progress_obj: -+ arg.append('progress_obj=True') -+ arg = ' '.join(arg) -+ if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url) -+ -+ self.cnt += 1 -+ self.running[self.cnt] = opts -+ os.write(self.stdin, arg +'\n') -+ -+ def perform(self): -+ ret = [] -+ lines = _readlines(self.stdout) -+ if not lines: -+ if DEBUG: DEBUG.info('downloader died') -+ raise KeyboardInterrupt -+ for line in lines: -+ # parse downloader output -+ line = line.split(' ', 5) -+ _id, size = map(int, line[:2]) -+ if len(line) == 2: -+ opts = self.running[_id] -+ m = opts.progress_obj -+ if m: -+ if not m.last_update_time: -+ m.start(text = opts.text) -+ m.update(size) -+ continue -+ # job done -+ opts = self.running.pop(_id) -+ if line[4] == 'OK': -+ ug_err = None -+ if DEBUG: DEBUG.info('success') -+ else: -+ ug_err = URLGrabError(int(line[4]), line[5]) -+ if DEBUG: DEBUG.info('failure: %s', err) -+ _TH.update(opts.url, int(line[2]), float(line[3]), ug_err) -+ ret.append((opts, size, ug_err)) -+ return ret -+ -+ def abort(self): -+ self.popen.stdin.close() -+ self.popen.stdout.close() -+ self.popen.wait() -+ -+class _ExternalDownloaderPool: -+ def __init__(self): -+ self.epoll = select.epoll() -+ self.running = {} -+ self.cache = {} -+ -+ def start(self, opts): -+ host = urlparse.urlsplit(opts.url).netloc -+ dl = self.cache.pop(host, None) -+ if not dl: -+ dl = _ExternalDownloader() -+ fl = fcntl.fcntl(dl.stdin, fcntl.F_GETFD) -+ fcntl.fcntl(dl.stdin, fcntl.F_SETFD, fl | fcntl.FD_CLOEXEC) -+ self.epoll.register(dl.stdout, select.EPOLLIN) -+ self.running[dl.stdout] = dl -+ dl.start(opts) -+ -+ def perform(self): -+ ret = [] -+ for fd, event in self.epoll.poll(): -+ assert event & select.EPOLLIN -+ done = self.running[fd].perform() -+ if not done: continue -+ assert len(done) == 1 -+ ret.extend(done) -+ -+ # dl finished, move it to the cache -+ host = urlparse.urlsplit(done[0][0].url).netloc -+ if host in self.cache: self.cache[host].abort() -+ self.epoll.unregister(fd) -+ self.cache[host] = self.running.pop(fd) -+ return ret -+ -+ def abort(self): -+ for dl in self.running.values(): -+ self.epoll.unregister(dl.stdout) -+ dl.abort() -+ for dl in self.cache.values(): -+ dl.abort() -+ -+ -+##################################################################### -+# High level async API -+##################################################################### -+ -+_async_queue = [] -+ -+def parallel_wait(meter = 'text'): -+ '''Process queued requests in parallel. -+ ''' -+ -+ if meter: -+ count = total = 0 -+ for opts in _async_queue: -+ count += 1 -+ total += opts.size -+ if meter == 'text': -+ from progress import TextMultiFileMeter -+ meter = TextMultiFileMeter() -+ meter.start(count, total) -+ -+ dl = _ExternalDownloaderPool() -+ host_con = {} # current host connection counts -+ -+ def start(opts, tries): -+ key, limit = opts.async -+ host_con[key] = host_con.get(key, 0) + 1 -+ opts.tries = tries -+ opts.progress_obj = meter and meter.newMeter() -+ if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url) -+ dl.start(opts) -+ -+ def perform(): -+ for opts, size, ug_err in dl.perform(): -+ key, limit = opts.async -+ host_con[key] -= 1 -+ if meter: -+ m = opts.progress_obj -+ m.basename = os.path.basename(opts.filename) -+ if ug_err: -+ m.failure(ug_err.args[1]) -+ else: -+ # file size might have changed -+ meter.re.total += size - opts.size -+ m.end(size) -+ meter.removeMeter(m) -+ -+ if ug_err is None: -+ if opts.checkfunc: -+ try: _run_callback(opts.checkfunc, opts) -+ except URLGrabError, ug_err: pass -+ if ug_err is None: -+ continue -+ -+ retry = opts.retry or 0 -+ if opts.failure_callback: -+ opts.exception = ug_err -+ try: _run_callback(opts.failure_callback, opts) -+ except URLGrabError, ug_err: -+ retry = 0 # no retries -+ if opts.tries < retry and ug_err.args[0] in opts.retrycodes: -+ start(opts, opts.tries + 1) # simple retry -+ continue -+ -+ if opts.mirror_group: -+ mg, failed = opts.mirror_group -+ opts.mirror = key -+ opts.exception = ug_err -+ action = _run_callback(mg.failure_callback, opts) -+ if not (action and action.get('fail')): -+ # mask this mirror and retry -+ failed.add(key) -+ _async_queue.append(opts) -+ continue -+ -+ # urlgrab failed -+ opts.exception = ug_err -+ _run_callback(opts.failfunc, opts) -+ -+ try: -+ idx = 0 -+ while True: -+ if idx >= len(_async_queue): -+ # the queue is empty -+ if not dl.running: break -+ # pending dl may extend it -+ perform() -+ continue -+ -+ # handle next request -+ opts = _async_queue[idx] -+ idx += 1 -+ -+ # check global limit -+ while len(dl.running) >= opts.max_connections: -+ perform() -+ -+ if opts.mirror_group: -+ mg, failed = opts.mirror_group -+ -+ # find the best mirror -+ best = None -+ for mirror in mg.mirrors: -+ key = mirror['mirror'] -+ if key in failed: continue -+ -+ # estimate mirror speed -+ speed = _TH.estimate(key) -+ speed /= 1 + host_con.get(key, 0) -+ if best is None or speed > best_speed: -+ best = mirror -+ best_speed = speed -+ -+ if best is None: -+ opts.exception = URLGrabError(256, _('No more mirrors to try.')) -+ _run_callback(opts.failfunc, opts) -+ continue -+ -+ # update the current mirror and limit -+ key = best['mirror'] -+ limit = best.get('kwargs', {}).get('max_connections', 3) -+ opts.async = key, limit -+ -+ # update URL and proxy -+ url = mg._join_url(key, opts.relative_url) -+ url, parts = opts.urlparser.parse(url, opts) -+ opts.find_proxy(url, parts[0]) -+ opts.url = url -+ -+ # check host limit, then start -+ key, limit = opts.async -+ while host_con.get(key, 0) >= limit: -+ perform() -+ start(opts, 1) -+ except IOError, e: -+ if e.errno != 4: raise -+ raise KeyboardInterrupt -+ -+ finally: -+ dl.abort() -+ if meter: meter.end() -+ del _async_queue[:] -+ _TH.save() -+ -+ -+##################################################################### -+# Host bandwidth estimation -+##################################################################### -+ -+class _TH: -+ hosts = {} -+ dirty = None -+ -+ @staticmethod -+ def load(): -+ filename = default_grabber.opts.timedhosts -+ if filename and _TH.dirty is None: -+ try: -+ for line in open(filename): -+ host, speed, fail, ts = line.split() -+ _TH.hosts[host] = int(speed), int(fail), int(ts) -+ except IOError: pass -+ _TH.dirty = False -+ -+ @staticmethod -+ def save(): -+ filename = default_grabber.opts.timedhosts -+ if filename and _TH.dirty is True: -+ tmp = '%s.%d' % (filename, os.getpid()) -+ try: -+ f = open(tmp, 'w') -+ for host in _TH.hosts: -+ f.write(host + ' %d %d %d\n' % _TH.hosts[host]) -+ f.close() -+ os.rename(tmp, filename) -+ except IOError: pass -+ _TH.dirty = False -+ -+ @staticmethod -+ def update(url, dl_size, dl_time, ug_err): -+ _TH.load() -+ host = urlparse.urlsplit(url).netloc -+ speed, fail, ts = _TH.hosts.get(host) or (0, 0, 0) -+ now = time.time() -+ -+ if ug_err is None: -+ # k1: the older, the less useful -+ # k2: if it was <1MiB, don't trust it much -+ # speeds vary, use 10:1 smoothing -+ k1 = 2**((ts - now) / default_grabber.opts.half_life) -+ k2 = min(dl_size / 1e6, 1.0) / 10 -+ speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2) -+ fail = 0 -+ elif getattr(ug_err, 'code', None) == 404: -+ fail = 0 # alive, at least -+ else: -+ fail += 1 # seems dead -+ -+ _TH.hosts[host] = speed, fail, now -+ _TH.dirty = True -+ -+ @staticmethod -+ def estimate(url): -+ _TH.load() -+ host = urlparse.urlsplit(url).netloc -+ default_speed = default_grabber.opts.default_speed -+ try: speed, fail, ts = _TH.hosts[host] -+ except KeyError: return default_speed -+ -+ speed *= 2**-fail -+ k = 2**((ts - time.time()) / default_grabber.opts.half_life) -+ speed = k * speed + (1 - k) * default_speed -+ return speed -+ -+##################################################################### - # TESTING - def _main_test(): - try: url, filename = sys.argv[1:3] -diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py -index 8731aed..d699b61 100644 ---- a/urlgrabber/mirror.py -+++ b/urlgrabber/mirror.py -@@ -76,6 +76,9 @@ CUSTOMIZATION - 'grabber' is omitted, the default grabber will be used. If - kwargs are omitted, then (duh) they will not be used. - -+ kwarg 'max_connections' is used to store the max connection -+ limit of this mirror. -+ - 3) Pass keyword arguments when instantiating the mirror group. - See, for example, the failure_callback argument. - -@@ -91,6 +94,7 @@ import random - import thread # needed for locking to make this threadsafe - - from grabber import URLGrabError, CallbackObject, DEBUG, _to_utf8 -+from grabber import _run_callback, _do_raise, _async_queue - - def _(st): - return st -@@ -254,7 +258,7 @@ class MirrorGroup: - # if these values are found in **kwargs passed to one of the urlXXX - # methods, they will be stripped before getting passed on to the - # grabber -- options = ['default_action', 'failure_callback'] -+ options = ['default_action', 'failure_callback', 'failfunc'] - - def _process_kwargs(self, kwargs): - self.failure_callback = kwargs.get('failure_callback') -@@ -403,10 +407,25 @@ class MirrorGroup: - self._failure(gr, obj) - - def urlgrab(self, url, filename=None, **kwargs): -+ if kwargs.get('async'): -+ opts = self.grabber.opts.derive(**kwargs) -+ opts.mirror_group = self, set() -+ opts.relative_url = _to_utf8(url) -+ -+ opts.url = 'http://tbd' -+ opts.filename = filename -+ opts.size = int(opts.size or 0) -+ _async_queue.append(opts) -+ return filename -+ - kw = dict(kwargs) - kw['filename'] = filename - func = 'urlgrab' -- return self._mirror_try(func, url, kw) -+ try: -+ return self._mirror_try(func, url, kw) -+ except URLGrabError, e: -+ obj = CallbackObject(url=url, filename=filename, exception=e, **kwargs) -+ return _run_callback(kwargs.get('failfunc', _do_raise), obj) - - def urlopen(self, url, **kwargs): - kw = dict(kwargs) -diff --git a/urlgrabber/progress.py b/urlgrabber/progress.py -index 3d7e99a..4c126c5 100644 ---- a/urlgrabber/progress.py -+++ b/urlgrabber/progress.py -@@ -576,7 +576,6 @@ class TextMultiFileMeter(MultiFileMeter): - self.fo.write(out) - finally: - self._lock.release() -- self._do_update_meter(meter, now) - - def _do_failure_meter(self, meter, message, now): - self._lock.acquire() -@@ -599,15 +598,6 @@ class TextMultiFileMeter(MultiFileMeter): - pass - finally: - self._lock.release() -- -- def _do_end(self, now): -- self._do_update_meter(None, now) -- self._lock.acquire() -- try: -- self.fo.write('\n') -- self.fo.flush() -- finally: -- self._lock.release() - - ###################################################################### - # support classes and functions diff --git a/python-urlgrabber.spec b/python-urlgrabber.spec index 5b79cdb..e89deeb 100644 --- a/python-urlgrabber.spec +++ b/python-urlgrabber.spec @@ -3,11 +3,9 @@ Summary: A high-level cross-protocol url-grabber Name: python-urlgrabber Version: 3.9.1 -Release: 14%{?dist} +Release: 15%{?dist} Source0: urlgrabber-%{version}.tar.gz Patch1: urlgrabber-HEAD.patch -Patch2: multi-downloader.patch -Patch3: file-url-profiling.patch License: LGPLv2+ Group: Development/Libraries @@ -26,8 +24,6 @@ authentication, proxies and more. %prep %setup -q -n urlgrabber-%{version} %patch1 -p1 -%patch2 -p1 -%patch3 -p1 %build python setup.py build @@ -48,6 +44,12 @@ rm -rf $RPM_BUILD_ROOT %attr(0755,root,root) %{_libexecdir}/urlgrabber-ext-down %changelog +* Fri Jul 20 2012 Zdeněk Pavlas - 3.9.1-15 +- Update to latest head, misc bugfixes: BZ 832028, 831904, 831291. +- Disable Kerberos auth. BZ 769254 +- copy_local bugfix. BZ 837018 +- send 'tries' counter to mirror failure callback + * Mon May 21 2012 Zdeněk Pavlas - 3.9.1-14 - timedhosts: sanity check on dl_time diff --git a/urlgrabber-HEAD.patch b/urlgrabber-HEAD.patch index 75e10e5..4e1b34b 100644 --- a/urlgrabber-HEAD.patch +++ b/urlgrabber-HEAD.patch @@ -71,6 +71,101 @@ index 518e512..09cd896 100644 def help_doc(self): print __doc__ +diff --git a/scripts/urlgrabber-ext-down b/scripts/urlgrabber-ext-down +new file mode 100755 +index 0000000..3da55a4 +--- /dev/null ++++ b/scripts/urlgrabber-ext-down +@@ -0,0 +1,72 @@ ++#! /usr/bin/python ++# A very simple external downloader ++# Copyright 2011-2012 Zdenek Pavlas ++ ++# This library is free software; you can redistribute it and/or ++# modify it under the terms of the GNU Lesser General Public ++# License as published by the Free Software Foundation; either ++# version 2.1 of the License, or (at your option) any later version. ++# ++# This library is distributed in the hope that it will be useful, ++# but WITHOUT ANY WARRANTY; without even the implied warranty of ++# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ++# Lesser General Public License for more details. ++# ++# You should have received a copy of the GNU Lesser General Public ++# License along with this library; if not, write to the ++# Free Software Foundation, Inc., ++# 59 Temple Place, Suite 330, ++# Boston, MA 02111-1307 USA ++ ++import time, os, errno, sys ++from urlgrabber.grabber import \ ++ _readlines, URLGrabberOptions, _loads, \ ++ PyCurlFileObject, URLGrabError ++ ++def write(fmt, *arg): ++ try: os.write(1, fmt % arg) ++ except OSError, e: ++ if e.args[0] != errno.EPIPE: raise ++ sys.exit(1) ++ ++class ProxyProgress: ++ def start(self, *d1, **d2): ++ self.next_update = 0 ++ def update(self, _amount_read): ++ t = time.time() ++ if t < self.next_update: return ++ self.next_update = t + 0.31 ++ write('%d %d\n', self._id, _amount_read) ++ ++def main(): ++ import signal ++ signal.signal(signal.SIGINT, lambda n, f: sys.exit(1)) ++ cnt = 0 ++ while True: ++ lines = _readlines(0) ++ if not lines: break ++ for line in lines: ++ cnt += 1 ++ opts = URLGrabberOptions() ++ opts._id = cnt ++ for k in line.split(' '): ++ k, v = k.split('=', 1) ++ setattr(opts, k, _loads(v)) ++ if opts.progress_obj: ++ opts.progress_obj = ProxyProgress() ++ opts.progress_obj._id = cnt ++ tm = time.time() ++ try: ++ fo = PyCurlFileObject(opts.url, opts.filename, opts) ++ fo._do_grab() ++ fo.fo.close() ++ size = fo._amount_read ++ dlsz = size - fo._reget_length ++ ug_err = 'OK' ++ except URLGrabError, e: ++ size = dlsz = 0 ++ ug_err = '%d %s' % e.args ++ write('%d %d %d %.3f %s\n', opts._id, size, dlsz, time.time() - tm, ug_err) ++ ++if __name__ == '__main__': ++ main() +diff --git a/setup.py b/setup.py +index d0b87b8..bfa4a18 100644 +--- a/setup.py ++++ b/setup.py +@@ -15,8 +15,10 @@ url = _urlgrabber.__url__ + packages = ['urlgrabber'] + package_dir = {'urlgrabber':'urlgrabber'} + scripts = ['scripts/urlgrabber'] +-data_files = [('share/doc/' + name + '-' + version, +- ['README','LICENSE', 'TODO', 'ChangeLog'])] ++data_files = [ ++ ('share/doc/' + name + '-' + version, ['README','LICENSE', 'TODO', 'ChangeLog']), ++ ('libexec', ['scripts/urlgrabber-ext-down']), ++] + options = { 'clean' : { 'all' : 1 } } + classifiers = [ + 'Development Status :: 4 - Beta', diff --git a/test/base_test_code.py b/test/base_test_code.py index 50c6348..5fb43f9 100644 --- a/test/base_test_code.py @@ -138,7 +233,7 @@ index 3e5f3b7..8eeaeda 100644 return (fb,lb) diff --git a/urlgrabber/grabber.py b/urlgrabber/grabber.py -index e090e90..38ae1f7 100644 +index e090e90..83823ea 100644 --- a/urlgrabber/grabber.py +++ b/urlgrabber/grabber.py @@ -49,7 +49,7 @@ GENERAL ARGUMENTS (kwargs) @@ -200,7 +295,7 @@ index e090e90..38ae1f7 100644 ssl_ca_cert = None this option can be used if M2Crypto is available and will be -@@ -211,43 +221,48 @@ GENERAL ARGUMENTS (kwargs) +@@ -211,43 +221,75 @@ GENERAL ARGUMENTS (kwargs) No-op when using the curl backend (default) @@ -255,10 +350,53 @@ index e090e90..38ae1f7 100644 + What type of name to IP resolving to use, default is to do both IPV4 and + IPV6. + ++ async = (key, limit) ++ ++ When this option is set, the urlgrab() is not processed immediately ++ but queued. parallel_wait() then processes grabs in parallel, limiting ++ the numer of connections in each 'key' group to at most 'limit'. ++ ++ max_connections ++ ++ The global connection limit. ++ ++ timedhosts ++ ++ The filename of the host download statistics. If defined, urlgrabber ++ will update the stats at the end of every download. At the end of ++ parallel_wait(), the updated stats are saved. If synchronous grabs ++ are used, you should call th_save(). ++ ++ default_speed, half_life ++ ++ These options only affect the async mirror selection code. ++ The default_speed option sets the speed estimate for mirrors ++ we have never downloaded from, and defaults to 1 MBps. ++ ++ The speed estimate also drifts exponentially from the speed ++ actually measured to the default speed, with default ++ period of 30 days. ++ RETRY RELATED ARGUMENTS -@@ -420,6 +435,7 @@ import time +@@ -328,6 +370,15 @@ RETRY RELATED ARGUMENTS + but it cannot (without severe trickiness) prevent the exception + from being raised. + ++ failfunc = None ++ ++ The callback that gets called when urlgrab request fails. ++ If defined, urlgrab() calls it instead of raising URLGrabError. ++ Callback syntax is identical to failure_callback. ++ ++ Contrary to failure_callback, it's called only once. It's primary ++ purpose is to use urlgrab() without a try/except block. ++ + interrupt_callback = None + + This callback is called if KeyboardInterrupt is received at any +@@ -420,6 +471,7 @@ import time import string import urllib import urllib2 @@ -266,8 +404,12 @@ index e090e90..38ae1f7 100644 import mimetools import thread import types -@@ -431,6 +447,14 @@ from httplib import HTTPException - import socket +@@ -428,9 +480,17 @@ import pycurl + from ftplib import parse150 + from StringIO import StringIO + from httplib import HTTPException +-import socket ++import socket, select, fcntl from byterange import range_tuple_normalize, range_tuple_to_header, RangeError +try: @@ -281,7 +423,7 @@ index e090e90..38ae1f7 100644 ######################################################################## # MODULE INITIALIZATION ######################################################################## -@@ -439,6 +463,12 @@ try: +@@ -439,6 +499,12 @@ try: except: __version__ = '???' @@ -294,7 +436,7 @@ index e090e90..38ae1f7 100644 ######################################################################## # functions for debugging output. These functions are here because they # are also part of the module initialization. -@@ -527,6 +557,22 @@ def _(st): +@@ -527,6 +593,22 @@ def _(st): # END MODULE INITIALIZATION ######################################################################## @@ -317,7 +459,7 @@ index e090e90..38ae1f7 100644 class URLGrabError(IOError): -@@ -662,6 +708,7 @@ class URLParser: +@@ -662,6 +744,7 @@ class URLParser: opts.quote = 0 --> do not quote it opts.quote = None --> guess """ @@ -325,7 +467,7 @@ index e090e90..38ae1f7 100644 quote = opts.quote if opts.prefix: -@@ -768,6 +815,41 @@ class URLGrabberOptions: +@@ -768,6 +851,41 @@ class URLGrabberOptions: else: # throttle is a float return self.bandwidth * self.throttle @@ -367,7 +509,12 @@ index e090e90..38ae1f7 100644 def derive(self, **kwargs): """Create a derived URLGrabberOptions instance. This method creates a new instance and overrides the -@@ -800,21 +882,25 @@ class URLGrabberOptions: +@@ -796,25 +914,31 @@ class URLGrabberOptions: + self.retry = None + self.retrycodes = [-1,2,4,5,6,7] + self.checkfunc = None ++ self.failfunc = _do_raise + self.copy_local = 0 self.close_connection = 0 self.range = None self.user_agent = 'urlgrabber/%s' % __version__ @@ -375,6 +522,7 @@ index e090e90..38ae1f7 100644 self.keepalive = 1 self.proxies = None + self.libproxy = False ++ self.proxy = None self.reget = None self.failure_callback = None self.interrupt_callback = None @@ -394,16 +542,76 @@ index e090e90..38ae1f7 100644 self.ssl_ca_cert = None # sets SSL_CAINFO - path to certdb self.ssl_context = None # no-op in pycurl self.ssl_verify_peer = True # check peer's cert for authenticityb -@@ -846,7 +932,7 @@ class URLGrabberOptions: +@@ -827,6 +951,12 @@ class URLGrabberOptions: + self.size = None # if we know how big the thing we're getting is going + # to be. this is ultimately a MAXIMUM size for the file + self.max_header_size = 2097152 #2mb seems reasonable for maximum header size ++ self.async = None # blocking by default ++ self.mirror_group = None ++ self.max_connections = 5 ++ self.timedhosts = None ++ self.half_life = 30*24*60*60 # 30 days ++ self.default_speed = 1e6 # 1 MBit + + def __repr__(self): + return self.format() +@@ -846,7 +976,18 @@ class URLGrabberOptions: s = s + indent + '}' return s -class URLGrabber: ++def _do_raise(obj): ++ raise obj.exception ++ ++def _run_callback(cb, obj): ++ if not cb: ++ return ++ if callable(cb): ++ return cb(obj) ++ cb, arg, karg = cb ++ return cb(obj, *arg, **karg) ++ +class URLGrabber(object): """Provides easy opening of URLs with a variety of options. All options are specified as kwargs. Options may be specified when -@@ -912,9 +998,11 @@ class URLGrabber: +@@ -872,7 +1013,6 @@ class URLGrabber: + # beware of infinite loops :) + tries = tries + 1 + exception = None +- retrycode = None + callback = None + if DEBUG: DEBUG.info('attempt %i/%s: %s', + tries, opts.retry, args[0]) +@@ -883,23 +1023,24 @@ class URLGrabber: + except URLGrabError, e: + exception = e + callback = opts.failure_callback +- retrycode = e.errno + except KeyboardInterrupt, e: + exception = e + callback = opts.interrupt_callback ++ if not callback: ++ raise + + if DEBUG: DEBUG.info('exception: %s', exception) + if callback: + if DEBUG: DEBUG.info('calling callback: %s', callback) +- cb_func, cb_args, cb_kwargs = self._make_callback(callback) + obj = CallbackObject(exception=exception, url=args[0], + tries=tries, retry=opts.retry) +- cb_func(obj, *cb_args, **cb_kwargs) ++ _run_callback(callback, obj) + + if (opts.retry is None) or (tries == opts.retry): + if DEBUG: DEBUG.info('retries exceeded, re-raising') + raise + ++ retrycode = getattr(exception, 'errno', None) + if (retrycode is not None) and (retrycode not in opts.retrycodes): + if DEBUG: DEBUG.info('retrycode (%i) not in list %s, re-raising', + retrycode, opts.retrycodes) +@@ -912,9 +1053,11 @@ class URLGrabber: returned that supports them. The file object can be treated like any other file object. """ @@ -415,7 +623,7 @@ index e090e90..38ae1f7 100644 def retryfunc(opts, url): return PyCurlFileObject(url, filename=None, opts=opts) return self._retry(opts, retryfunc, url) -@@ -925,12 +1013,17 @@ class URLGrabber: +@@ -925,12 +1068,17 @@ class URLGrabber: urlgrab returns the filename of the local file, which may be different from the passed-in filename if copy_local == 0. """ @@ -433,7 +641,57 @@ index e090e90..38ae1f7 100644 if scheme == 'file' and not opts.copy_local: # just return the name of the local file - don't make a # copy currently -@@ -982,9 +1075,11 @@ class URLGrabber: +@@ -950,30 +1098,36 @@ class URLGrabber: + + elif not opts.range: + if not opts.checkfunc is None: +- cb_func, cb_args, cb_kwargs = \ +- self._make_callback(opts.checkfunc) +- obj = CallbackObject() +- obj.filename = path +- obj.url = url +- apply(cb_func, (obj, )+cb_args, cb_kwargs) ++ obj = CallbackObject(filename=path, url=url) ++ _run_callback(opts.checkfunc, obj) + return path + ++ if opts.async: ++ opts.url = url ++ opts.filename = filename ++ opts.size = int(opts.size or 0) ++ _async_queue.append(opts) ++ return filename ++ + def retryfunc(opts, url, filename): ++ tm = time.time() + fo = PyCurlFileObject(url, filename, opts) + try: + fo._do_grab() ++ _TH.update(url, fo._amount_read - fo._reget_length, time.time() - tm, None) + if not opts.checkfunc is None: +- cb_func, cb_args, cb_kwargs = \ +- self._make_callback(opts.checkfunc) +- obj = CallbackObject() +- obj.filename = filename +- obj.url = url +- apply(cb_func, (obj, )+cb_args, cb_kwargs) ++ obj = CallbackObject(filename=filename, url=url) ++ _run_callback(opts.checkfunc, obj) + finally: + fo.close() + return filename + +- return self._retry(opts, retryfunc, url, filename) ++ try: ++ return self._retry(opts, retryfunc, url, filename) ++ except URLGrabError, e: ++ _TH.update(url, 0, 0, e) ++ opts.exception = e ++ return _run_callback(opts.failfunc, opts) + + def urlread(self, url, limit=None, **kwargs): + """read the url into a string, up to 'limit' bytes +@@ -982,9 +1136,11 @@ class URLGrabber: "I want the first N bytes" but rather 'read the whole file into memory, but don't use too much' """ @@ -445,7 +703,30 @@ index e090e90..38ae1f7 100644 if limit is not None: limit = limit + 1 -@@ -1030,7 +1125,7 @@ class URLGrabber: +@@ -1000,12 +1156,8 @@ class URLGrabber: + else: s = fo.read(limit) + + if not opts.checkfunc is None: +- cb_func, cb_args, cb_kwargs = \ +- self._make_callback(opts.checkfunc) +- obj = CallbackObject() +- obj.data = s +- obj.url = url +- apply(cb_func, (obj, )+cb_args, cb_kwargs) ++ obj = CallbackObject(data=s, url=url) ++ _run_callback(opts.checkfunc, obj) + finally: + fo.close() + return s +@@ -1020,6 +1172,7 @@ class URLGrabber: + return s + + def _make_callback(self, callback_obj): ++ # not used, left for compatibility + if callable(callback_obj): + return callback_obj, (), {} + else: +@@ -1030,7 +1183,7 @@ class URLGrabber: default_grabber = URLGrabber() @@ -454,7 +735,7 @@ index e090e90..38ae1f7 100644 def __init__(self, url, filename, opts): self.fo = None self._hdr_dump = '' -@@ -1052,10 +1147,11 @@ class PyCurlFileObject(): +@@ -1052,10 +1205,11 @@ class PyCurlFileObject(): self._reget_length = 0 self._prog_running = False self._error = (None, None) @@ -468,7 +749,7 @@ index e090e90..38ae1f7 100644 def __getattr__(self, name): """This effectively allows us to wrap at the instance level. Any attribute not found in _this_ object will be searched for -@@ -1085,9 +1181,14 @@ class PyCurlFileObject(): +@@ -1085,9 +1239,14 @@ class PyCurlFileObject(): return -1 def _hdr_retrieve(self, buf): @@ -484,7 +765,7 @@ index e090e90..38ae1f7 100644 try: self._hdr_dump += buf # we have to get the size before we do the progress obj start -@@ -1104,7 +1205,17 @@ class PyCurlFileObject(): +@@ -1104,7 +1263,17 @@ class PyCurlFileObject(): s = parse150(buf) if s: self.size = int(s) @@ -503,7 +784,7 @@ index e090e90..38ae1f7 100644 return len(buf) except KeyboardInterrupt: return pycurl.READFUNC_ABORT -@@ -1113,8 +1224,10 @@ class PyCurlFileObject(): +@@ -1113,8 +1282,10 @@ class PyCurlFileObject(): if self._parsed_hdr: return self._parsed_hdr statusend = self._hdr_dump.find('\n') @@ -514,7 +795,7 @@ index e090e90..38ae1f7 100644 self._parsed_hdr = mimetools.Message(hdrfp) return self._parsed_hdr -@@ -1127,6 +1240,9 @@ class PyCurlFileObject(): +@@ -1127,6 +1298,9 @@ class PyCurlFileObject(): if not opts: opts = self.opts @@ -524,7 +805,7 @@ index e090e90..38ae1f7 100644 # defaults we're always going to set self.curl_obj.setopt(pycurl.NOPROGRESS, False) -@@ -1136,11 +1252,21 @@ class PyCurlFileObject(): +@@ -1136,11 +1310,21 @@ class PyCurlFileObject(): self.curl_obj.setopt(pycurl.PROGRESSFUNCTION, self._progress_update) self.curl_obj.setopt(pycurl.FAILONERROR, True) self.curl_obj.setopt(pycurl.OPT_FILETIME, True) @@ -546,7 +827,7 @@ index e090e90..38ae1f7 100644 # maybe to be options later self.curl_obj.setopt(pycurl.FOLLOWLOCATION, True) -@@ -1148,9 +1274,11 @@ class PyCurlFileObject(): +@@ -1148,9 +1332,11 @@ class PyCurlFileObject(): # timeouts timeout = 300 @@ -561,7 +842,7 @@ index e090e90..38ae1f7 100644 # ssl options if self.scheme == 'https': -@@ -1158,13 +1286,16 @@ class PyCurlFileObject(): +@@ -1158,13 +1344,16 @@ class PyCurlFileObject(): self.curl_obj.setopt(pycurl.CAPATH, opts.ssl_ca_cert) self.curl_obj.setopt(pycurl.CAINFO, opts.ssl_ca_cert) self.curl_obj.setopt(pycurl.SSL_VERIFYPEER, opts.ssl_verify_peer) @@ -579,7 +860,7 @@ index e090e90..38ae1f7 100644 if opts.ssl_cert_type: self.curl_obj.setopt(pycurl.SSLCERTTYPE, opts.ssl_cert_type) if opts.ssl_key_pass: -@@ -1187,28 +1318,24 @@ class PyCurlFileObject(): +@@ -1187,28 +1376,26 @@ class PyCurlFileObject(): if hasattr(opts, 'raw_throttle') and opts.raw_throttle(): self.curl_obj.setopt(pycurl.MAX_RECV_SPEED_LARGE, int(opts.raw_throttle())) @@ -603,7 +884,9 @@ index e090e90..38ae1f7 100644 + # proxy + if opts.proxy is not None: + self.curl_obj.setopt(pycurl.PROXY, opts.proxy) -+ self.curl_obj.setopt(pycurl.PROXYAUTH, pycurl.HTTPAUTH_ANY) ++ self.curl_obj.setopt(pycurl.PROXYAUTH, ++ # All but Kerberos. BZ 769254 ++ pycurl.HTTPAUTH_ANY - pycurl.HTTPAUTH_GSSNEGOTIATE) + + if opts.username and opts.password: + if self.scheme in ('http', 'https'): @@ -622,23 +905,7 @@ index e090e90..38ae1f7 100644 # our url self.curl_obj.setopt(pycurl.URL, self.url) -@@ -1219,8 +1346,14 @@ class PyCurlFileObject(): - return - - try: -+ e = None - self.curl_obj.perform() -- except pycurl.error, e: -+ except pycurl.error, e: pass -+ self._do_perform_exc(e) -+ -+ def _do_perform_exc(self, e): -+ # handle pycurl exception 'e' -+ if e: - # XXX - break some of these out a bit more clearly - # to other URLGrabErrors from - # http://curl.haxx.se/libcurl/c/libcurl-errors.html -@@ -1228,12 +1361,14 @@ class PyCurlFileObject(): +@@ -1228,12 +1415,14 @@ class PyCurlFileObject(): code = self.http_code errcode = e.args[0] @@ -655,7 +922,7 @@ index e090e90..38ae1f7 100644 # this is probably wrong but ultimately this is what happens # we have a legit http code and a pycurl 'writer failed' code -@@ -1244,23 +1379,23 @@ class PyCurlFileObject(): +@@ -1244,23 +1433,23 @@ class PyCurlFileObject(): raise KeyboardInterrupt elif errcode == 28: @@ -686,7 +953,7 @@ index e090e90..38ae1f7 100644 # this is probably wrong but ultimately this is what happens # we have a legit http code and a pycurl 'writer failed' code # which almost always means something aborted it from outside -@@ -1272,33 +1407,94 @@ class PyCurlFileObject(): +@@ -1272,33 +1461,94 @@ class PyCurlFileObject(): elif errcode == 58: msg = _("problem with the local client certificate") err = URLGrabError(14, msg) @@ -788,7 +1055,7 @@ index e090e90..38ae1f7 100644 def _do_open(self): self.curl_obj = _curl_cache -@@ -1333,7 +1529,11 @@ class PyCurlFileObject(): +@@ -1333,7 +1583,11 @@ class PyCurlFileObject(): if self.opts.range: rt = self.opts.range @@ -801,31 +1068,7 @@ index e090e90..38ae1f7 100644 if rt: header = range_tuple_to_header(rt) -@@ -1407,22 +1607,7 @@ class PyCurlFileObject(): - _was_filename = False - if type(self.filename) in types.StringTypes and self.filename: - _was_filename = True -- self._prog_reportname = str(self.filename) -- self._prog_basename = os.path.basename(self.filename) -- -- if self.append: mode = 'ab' -- else: mode = 'wb' -- -- if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \ -- (self.filename, mode)) -- try: -- self.fo = open(self.filename, mode) -- except IOError, e: -- err = URLGrabError(16, _(\ -- 'error opening local file from %s, IOError: %s') % (self.url, e)) -- err.url = self.url -- raise err -- -+ self._do_open_fo() - else: - self._prog_reportname = 'MEMORY' - self._prog_basename = 'MEMORY' -@@ -1434,27 +1619,71 @@ class PyCurlFileObject(): +@@ -1434,21 +1688,46 @@ class PyCurlFileObject(): #fh, self._temp_name = mkstemp() #self.fo = open(self._temp_name, 'wb') @@ -834,21 +1077,38 @@ index e090e90..38ae1f7 100644 - - - -- if _was_filename: -- # close it up + try: + self._do_perform() + except URLGrabError, e: ++ self.fo.flush() ++ self.fo.close() ++ raise e ++ + if _was_filename: + # close it up self.fo.flush() self.fo.close() -- # set the time -- mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME) -- if mod_time != -1: ++ ++ # Set the URL where we got it from: ++ if xattr is not None: ++ # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes ++ try: ++ xattr.set(self.filename, 'user.xdg.origin.url', self.url) ++ except: ++ pass # URL too long. = IOError ... ignore everything. ++ + # set the time + mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME) + if mod_time != -1: - os.utime(self.filename, (mod_time, mod_time)) -+ raise e -+ -+ if _was_filename: -+ self._do_close_fo() ++ try: ++ os.utime(self.filename, (mod_time, mod_time)) ++ except OSError, e: ++ err = URLGrabError(16, _(\ ++ 'error setting timestamp on file %s from %s, OSError: %s') ++ % (self.filename, self.url, e)) ++ err.url = self.url ++ raise err # re open it - self.fo = open(self.filename, 'r') + try: @@ -862,54 +1122,7 @@ index e090e90..38ae1f7 100644 else: #self.fo = open(self._temp_name, 'r') self.fo.seek(0) - - self._complete = True - -+ def _do_open_fo(self): -+ self._prog_reportname = str(self.filename) -+ self._prog_basename = os.path.basename(self.filename) -+ if self.append: mode = 'ab' -+ else: mode = 'wb' -+ -+ if DEBUG: DEBUG.info('opening local file "%s" with mode %s' % \ -+ (self.filename, mode)) -+ try: -+ self.fo = open(self.filename, mode) -+ except IOError, e: -+ err = URLGrabError(16, _(\ -+ 'error opening local file from %s, IOError: %s') % (self.url, e)) -+ err.url = self.url -+ raise err -+ -+ def _do_close_fo(self): -+ # close it up -+ self.fo.flush() -+ self.fo.close() -+ -+ # Set the URL where we got it from: -+ if xattr is not None: -+ # See: http://www.freedesktop.org/wiki/CommonExtendedAttributes -+ try: -+ xattr.set(self.filename, 'user.xdg.origin.url', self.url) -+ except: -+ pass # URL too long. = IOError ... ignore everything. -+ -+ # set the time -+ mod_time = self.curl_obj.getinfo(pycurl.INFO_FILETIME) -+ if mod_time != -1: -+ try: -+ os.utime(self.filename, (mod_time, mod_time)) -+ except OSError, e: -+ err = URLGrabError(16, _(\ -+ 'error setting timestamp on file %s from %s, OSError: %s') -+ % (self.filename, self.url, e)) -+ err.url = self.url -+ raise err -+ - def _fill_buffer(self, amt=None): - """fill the buffer to contain at least 'amt' bytes by reading - from the underlying file object. If amt is None, then it will -@@ -1526,17 +1755,20 @@ class PyCurlFileObject(): +@@ -1526,17 +1805,20 @@ class PyCurlFileObject(): if self._prog_running: downloaded += self._reget_length self.opts.progress_obj.update(downloaded) @@ -935,7 +1148,7 @@ index e090e90..38ae1f7 100644 msg = _("Downloaded more than max size for %s: %s > %s") \ % (self.url, cur, max_size) -@@ -1544,13 +1776,6 @@ class PyCurlFileObject(): +@@ -1544,13 +1826,6 @@ class PyCurlFileObject(): return True return False @@ -949,7 +1162,7 @@ index e090e90..38ae1f7 100644 def read(self, amt=None): self._fill_buffer(amt) if amt is None: -@@ -1582,9 +1807,21 @@ class PyCurlFileObject(): +@@ -1582,9 +1857,21 @@ class PyCurlFileObject(): self.opts.progress_obj.end(self._amount_read) self.fo.close() @@ -972,20 +1185,482 @@ index e090e90..38ae1f7 100644 ##################################################################### # DEPRECATED FUNCTIONS +@@ -1621,6 +1908,442 @@ def retrygrab(url, filename=None, copy_local=0, close_connection=0, + + + ##################################################################### ++# Serializer + parser: A replacement of the rather bulky Json code. ++# ++# - handles basic python literals, lists and tuples. ++# - serialized strings never contain ' ' or '\n' ++# ++##################################################################### ++ ++_quoter_map = {} ++for c in '%[(,)] \n': ++ _quoter_map[c] = '%%%02x' % ord(c) ++del c ++ ++def _dumps(v): ++ if v is None: return 'None' ++ if v is True: return 'True' ++ if v is False: return 'False' ++ if type(v) in (int, long, float): ++ return str(v) ++ if type(v) == unicode: ++ v = v.encode('UTF8') ++ if type(v) == str: ++ def quoter(c): return _quoter_map.get(c, c) ++ return "'%s'" % ''.join(map(quoter, v)) ++ if type(v) == tuple: ++ return "(%s)" % ','.join(map(_dumps, v)) ++ if type(v) == list: ++ return "[%s]" % ','.join(map(_dumps, v)) ++ raise TypeError, 'Can\'t serialize %s' % v ++ ++def _loads(s): ++ def decode(v): ++ if v == 'None': return None ++ if v == 'True': return True ++ if v == 'False': return False ++ try: return int(v) ++ except ValueError: pass ++ try: return float(v) ++ except ValueError: pass ++ if len(v) >= 2 and v[0] == v[-1] == "'": ++ ret = []; i = 1 ++ while True: ++ j = v.find('%', i) ++ ret.append(v[i:j]) # skips the final "'" ++ if j == -1: break ++ ret.append(chr(int(v[j + 1:j + 3], 16))) ++ i = j + 3 ++ v = ''.join(ret) ++ return v ++ stk = None ++ l = [] ++ i = j = 0 ++ while True: ++ if j == len(s) or s[j] in ',)]': ++ if j > i: ++ l.append(decode(s[i:j])) ++ if j == len(s): break ++ if s[j] in ')]': ++ if s[j] == ')': ++ l = tuple(l) ++ stk[0].append(l) ++ l, stk = stk ++ i = j = j + 1 ++ elif s[j] in '[(': ++ stk = l, stk ++ l = [] ++ i = j = j + 1 ++ else: ++ j += 1 # safe because '[(,)]' are quoted ++ if stk: raise ValueError ++ if len(l) == 1: l = l[0] ++ return l ++ ++ ++##################################################################### ++# External downloader process ++##################################################################### ++ ++def _readlines(fd): ++ buf = os.read(fd, 4096) ++ if not buf: return None ++ # whole lines only, no buffering ++ while buf[-1] != '\n': ++ buf += os.read(fd, 4096) ++ return buf[:-1].split('\n') ++ ++import subprocess ++ ++class _ExternalDownloader: ++ def __init__(self): ++ self.popen = subprocess.Popen( ++ '/usr/libexec/urlgrabber-ext-down', ++ stdin = subprocess.PIPE, ++ stdout = subprocess.PIPE, ++ ) ++ self.stdin = self.popen.stdin.fileno() ++ self.stdout = self.popen.stdout.fileno() ++ self.running = {} ++ self.cnt = 0 ++ ++ # list of options we pass to downloader ++ _options = ( ++ 'url', 'filename', ++ 'timeout', 'close_connection', 'keepalive', ++ 'throttle', 'bandwidth', 'range', 'reget', ++ 'user_agent', 'http_headers', 'ftp_headers', ++ 'proxy', 'prefix', 'username', 'password', ++ 'ssl_ca_cert', ++ 'ssl_cert', 'ssl_cert_type', ++ 'ssl_key', 'ssl_key_type', ++ 'ssl_key_pass', ++ 'ssl_verify_peer', 'ssl_verify_host', ++ 'size', 'max_header_size', 'ip_resolve', ++ ) ++ ++ def start(self, opts): ++ arg = [] ++ for k in self._options: ++ v = getattr(opts, k) ++ if v is None: continue ++ arg.append('%s=%s' % (k, _dumps(v))) ++ if opts.progress_obj: ++ arg.append('progress_obj=True') ++ arg = ' '.join(arg) ++ if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url) ++ ++ self.cnt += 1 ++ self.running[self.cnt] = opts ++ os.write(self.stdin, arg +'\n') ++ ++ def perform(self): ++ ret = [] ++ lines = _readlines(self.stdout) ++ if not lines: ++ if DEBUG: DEBUG.info('downloader died') ++ raise KeyboardInterrupt ++ for line in lines: ++ # parse downloader output ++ line = line.split(' ', 5) ++ _id, size = map(int, line[:2]) ++ if len(line) == 2: ++ self.running[_id].progress_obj.update(size) ++ continue ++ # job done ++ opts = self.running.pop(_id) ++ if line[4] == 'OK': ++ ug_err = None ++ if DEBUG: DEBUG.info('success') ++ else: ++ ug_err = URLGrabError(int(line[4]), line[5]) ++ if DEBUG: DEBUG.info('failure: %s', err) ++ _TH.update(opts.url, int(line[2]), float(line[3]), ug_err, opts.async[0]) ++ ret.append((opts, size, ug_err)) ++ return ret ++ ++ def abort(self): ++ self.popen.stdin.close() ++ self.popen.stdout.close() ++ self.popen.wait() ++ ++class _ExternalDownloaderPool: ++ def __init__(self): ++ self.epoll = select.epoll() ++ self.running = {} ++ self.cache = {} ++ ++ def start(self, opts): ++ host = urlparse.urlsplit(opts.url).netloc ++ dl = self.cache.pop(host, None) ++ if not dl: ++ dl = _ExternalDownloader() ++ fl = fcntl.fcntl(dl.stdin, fcntl.F_GETFD) ++ fcntl.fcntl(dl.stdin, fcntl.F_SETFD, fl | fcntl.FD_CLOEXEC) ++ self.epoll.register(dl.stdout, select.EPOLLIN) ++ self.running[dl.stdout] = dl ++ dl.start(opts) ++ ++ def perform(self): ++ ret = [] ++ for fd, event in self.epoll.poll(): ++ assert event & select.EPOLLIN ++ done = self.running[fd].perform() ++ if not done: continue ++ assert len(done) == 1 ++ ret.extend(done) ++ ++ # dl finished, move it to the cache ++ host = urlparse.urlsplit(done[0][0].url).netloc ++ if host in self.cache: self.cache[host].abort() ++ self.epoll.unregister(fd) ++ self.cache[host] = self.running.pop(fd) ++ return ret ++ ++ def abort(self): ++ for dl in self.running.values(): ++ self.epoll.unregister(dl.stdout) ++ dl.abort() ++ for dl in self.cache.values(): ++ dl.abort() ++ ++ ++##################################################################### ++# High level async API ++##################################################################### ++ ++_async_queue = [] ++ ++def parallel_wait(meter = 'text'): ++ '''Process queued requests in parallel. ++ ''' ++ ++ if meter: ++ count = total = 0 ++ for opts in _async_queue: ++ if opts.progress_obj: ++ count += 1 ++ total += opts.size ++ if meter == 'text': ++ from progress import TextMultiFileMeter ++ meter = TextMultiFileMeter() ++ meter.start(count, total) ++ ++ dl = _ExternalDownloaderPool() ++ host_con = {} # current host connection counts ++ ++ def start(opts, tries): ++ key, limit = opts.async ++ host_con[key] = host_con.get(key, 0) + 1 ++ opts.tries = tries ++ if meter and opts.progress_obj: ++ opts.progress_obj = meter.newMeter() ++ opts.progress_obj.start(text=opts.text, basename=os.path.basename(opts.filename)) ++ else: ++ opts.progress_obj = None ++ if DEBUG: DEBUG.info('attempt %i/%s: %s', opts.tries, opts.retry, opts.url) ++ dl.start(opts) ++ ++ def perform(): ++ for opts, size, ug_err in dl.perform(): ++ key, limit = opts.async ++ host_con[key] -= 1 ++ m = opts.progress_obj ++ if m: ++ if ug_err: ++ m.failure(ug_err.args[1]) ++ else: ++ # file size might have changed ++ meter.re.total += size - opts.size ++ m.end(size) ++ meter.removeMeter(m) ++ ++ if ug_err is None: ++ if opts.checkfunc: ++ try: _run_callback(opts.checkfunc, opts) ++ except URLGrabError, ug_err: pass ++ if ug_err is None: ++ continue ++ ++ retry = opts.retry or 0 ++ if opts.failure_callback: ++ opts.exception = ug_err ++ try: _run_callback(opts.failure_callback, opts) ++ except URLGrabError, ug_err: ++ retry = 0 # no retries ++ if opts.tries < retry and ug_err.errno in opts.retrycodes: ++ start(opts, opts.tries + 1) # simple retry ++ continue ++ ++ if opts.mirror_group: ++ mg, failed, removed = opts.mirror_group ++ failed[key] = failed.get(key, 0) + 1 ++ opts.mirror = key ++ opts.exception = ug_err ++ action = mg.default_action or {} ++ if mg.failure_callback: ++ opts.tries = sum(failed.values()) ++ action.update(_run_callback(mg.failure_callback, opts)) ++ if not action.get('fail', 0): ++ # mask this mirror and retry ++ if action.get('remove', 1): ++ removed.add(key) ++ _async_queue.append(opts) ++ continue ++ ++ # urlgrab failed ++ opts.exception = ug_err ++ _run_callback(opts.failfunc, opts) ++ ++ try: ++ idx = 0 ++ while True: ++ if idx >= len(_async_queue): ++ # the queue is empty ++ if not dl.running: break ++ # pending dl may extend it ++ perform() ++ continue ++ ++ # handle next request ++ opts = _async_queue[idx] ++ idx += 1 ++ ++ # check global limit ++ while len(dl.running) >= opts.max_connections: ++ perform() ++ ++ if opts.mirror_group: ++ mg, failed, removed = opts.mirror_group ++ ++ # find the best mirror ++ best = None ++ best_speed = None ++ for mirror in mg.mirrors: ++ key = mirror['mirror'] ++ if key in removed: continue ++ ++ # estimate mirror speed ++ speed = _TH.estimate(key) ++ speed /= 1 + host_con.get(key, 0) ++ ++ # 2-tuple to select mirror with least failures ++ speed = -failed.get(key, 0), speed ++ if best is None or speed > best_speed: ++ best = mirror ++ best_speed = speed ++ ++ if best is None: ++ opts.exception = URLGrabError(256, _('No more mirrors to try.')) ++ _run_callback(opts.failfunc, opts) ++ continue ++ ++ # update the current mirror and limit ++ key = best['mirror'] ++ limit = best.get('kwargs', {}).get('max_connections', 2) ++ opts.async = key, limit ++ ++ # update URL and proxy ++ url = mg._join_url(key, opts.relative_url) ++ url, parts = opts.urlparser.parse(url, opts) ++ opts.find_proxy(url, parts[0]) ++ opts.url = url ++ ++ # check host limit, then start ++ key, limit = opts.async ++ while host_con.get(key, 0) >= limit: ++ perform() ++ start(opts, 1) ++ except IOError, e: ++ if e.errno != 4: raise ++ raise KeyboardInterrupt ++ ++ finally: ++ dl.abort() ++ if meter: meter.end() ++ del _async_queue[:] ++ _TH.save() ++ ++ ++##################################################################### ++# Host bandwidth estimation ++##################################################################### ++ ++class _TH: ++ hosts = {} ++ dirty = None ++ ++ @staticmethod ++ def load(): ++ filename = default_grabber.opts.timedhosts ++ if filename and _TH.dirty is None: ++ try: ++ for line in open(filename): ++ host, speed, fail, ts = line.split(' ', 3) ++ _TH.hosts[host] = int(speed), int(fail), int(ts) ++ except IOError: pass ++ _TH.dirty = False ++ ++ @staticmethod ++ def save(): ++ filename = default_grabber.opts.timedhosts ++ if filename and _TH.dirty is True: ++ tmp = '%s.%d' % (filename, os.getpid()) ++ try: ++ f = open(tmp, 'w') ++ for host in _TH.hosts: ++ f.write(host + ' %d %d %d\n' % _TH.hosts[host]) ++ f.close() ++ os.rename(tmp, filename) ++ except IOError: pass ++ _TH.dirty = False ++ ++ @staticmethod ++ def update(url, dl_size, dl_time, ug_err, baseurl=None): ++ _TH.load() ++ ++ # Use hostname from URL. If it's a file:// URL, use baseurl. ++ # If no baseurl, do not update timedhosts. ++ host = urlparse.urlsplit(url).netloc.split('@')[-1] or baseurl ++ if not host: return ++ ++ speed, fail, ts = _TH.hosts.get(host) or (0, 0, 0) ++ now = time.time() ++ ++ if ug_err is None: ++ # k1: the older, the less useful ++ # k2: <500ms readings are less reliable ++ # speeds vary, use 10:1 smoothing ++ k1 = 2**((ts - now) / default_grabber.opts.half_life) ++ k2 = min(dl_time / .500, 1.0) / 10 ++ if k2 > 0: ++ speed = (k1 * speed + k2 * dl_size / dl_time) / (k1 + k2) ++ fail = 0 ++ elif getattr(ug_err, 'code', None) == 404: ++ fail = 0 # alive, at least ++ else: ++ fail += 1 # seems dead ++ ++ _TH.hosts[host] = speed, fail, now ++ _TH.dirty = True ++ ++ @staticmethod ++ def estimate(baseurl): ++ _TH.load() ++ ++ # Use just the hostname, unless it's a file:// baseurl. ++ host = urlparse.urlsplit(baseurl).netloc.split('@')[-1] or baseurl ++ ++ default_speed = default_grabber.opts.default_speed ++ try: speed, fail, ts = _TH.hosts[host] ++ except KeyError: return default_speed ++ ++ speed *= 2**-fail ++ k = 2**((ts - time.time()) / default_grabber.opts.half_life) ++ speed = k * speed + (1 - k) * default_speed ++ return speed ++ ++##################################################################### + # TESTING + def _main_test(): + try: url, filename = sys.argv[1:3] diff --git a/urlgrabber/mirror.py b/urlgrabber/mirror.py -index dad410b..8731aed 100644 +index dad410b..ac78b34 100644 --- a/urlgrabber/mirror.py +++ b/urlgrabber/mirror.py -@@ -90,7 +90,7 @@ CUSTOMIZATION +@@ -76,6 +76,9 @@ CUSTOMIZATION + 'grabber' is omitted, the default grabber will be used. If + kwargs are omitted, then (duh) they will not be used. + ++ kwarg 'max_connections' is used to store the max connection ++ limit of this mirror. ++ + 3) Pass keyword arguments when instantiating the mirror group. + See, for example, the failure_callback argument. + +@@ -90,7 +93,8 @@ CUSTOMIZATION import random import thread # needed for locking to make this threadsafe -from grabber import URLGrabError, CallbackObject, DEBUG +from grabber import URLGrabError, CallbackObject, DEBUG, _to_utf8 ++from grabber import _run_callback, _do_raise def _(st): return st -@@ -263,7 +263,8 @@ class MirrorGroup: +@@ -184,6 +188,7 @@ class MirrorGroup: + + obj.exception = < exception that was raised > + obj.mirror = < the mirror that was tried > ++ obj.tries = < the number of mirror tries so far > + obj.relative_url = < url relative to the mirror > + obj.url = < full url that failed > + # .url is just the combination of .mirror +@@ -263,7 +268,8 @@ class MirrorGroup: def _parse_mirrors(self, mirrors): parsed_mirrors = [] for m in mirrors: @@ -995,8 +1670,44 @@ index dad410b..8731aed 100644 parsed_mirrors.append(m) return parsed_mirrors +@@ -382,7 +388,9 @@ class MirrorGroup: + try: del kw[k] + except KeyError: pass + ++ tries = 0 + while 1: ++ tries += 1 + mirrorchoice = self._get_mirror(gr) + fullurl = self._join_url(mirrorchoice['mirror'], gr.url) + kwargs = dict(mirrorchoice.get('kwargs', {})) +@@ -399,13 +407,24 @@ class MirrorGroup: + obj.mirror = mirrorchoice['mirror'] + obj.relative_url = gr.url + obj.url = fullurl ++ obj.tries = tries + self._failure(gr, obj) + + def urlgrab(self, url, filename=None, **kwargs): + kw = dict(kwargs) + kw['filename'] = filename ++ if kw.get('async'): ++ # enable mirror failovers in async path ++ kw['mirror_group'] = self, {}, set() ++ kw['relative_url'] = url ++ else: ++ kw.pop('failfunc', None) + func = 'urlgrab' +- return self._mirror_try(func, url, kw) ++ try: ++ return self._mirror_try(func, url, kw) ++ except URLGrabError, e: ++ obj = CallbackObject(url=url, filename=filename, exception=e, **kwargs) ++ return _run_callback(kwargs.get('failfunc', _do_raise), obj) + + def urlopen(self, url, **kwargs): + kw = dict(kwargs) diff --git a/urlgrabber/progress.py b/urlgrabber/progress.py -index dd07c6a..3d7e99a 100644 +index dd07c6a..4c126c5 100644 --- a/urlgrabber/progress.py +++ b/urlgrabber/progress.py @@ -211,6 +211,21 @@ def text_meter_total_size(size, downloaded=0): @@ -1154,7 +1865,7 @@ index dd07c6a..3d7e99a 100644 self.fo.flush() finally: self._lock.release() -@@ -502,15 +552,28 @@ class TextMultiFileMeter(MultiFileMeter): +@@ -502,18 +552,30 @@ class TextMultiFileMeter(MultiFileMeter): self._lock.acquire() try: format = "%-30.30s %6.6s %8.8s %9.9s" @@ -1187,8 +1898,27 @@ index dd07c6a..3d7e99a 100644 + self.fo.write(out) finally: self._lock.release() - self._do_update_meter(meter, now) -@@ -658,6 +721,8 @@ def format_time(seconds, use_hours=0): +- self._do_update_meter(meter, now) + + def _do_failure_meter(self, meter, message, now): + self._lock.acquire() +@@ -536,15 +598,6 @@ class TextMultiFileMeter(MultiFileMeter): + pass + finally: + self._lock.release() +- +- def _do_end(self, now): +- self._do_update_meter(None, now) +- self._lock.acquire() +- try: +- self.fo.write('\n') +- self.fo.flush() +- finally: +- self._lock.release() + + ###################################################################### + # support classes and functions +@@ -658,6 +711,8 @@ def format_time(seconds, use_hours=0): if seconds is None or seconds < 0: if use_hours: return '--:--:--' else: return '--:--'