[mirrorbrain-commits] [opensuse-svn] r7590 - in trunk/tools/download-redirector-v2/mirrordoctor: . mb

From: Novell Forge SVN <noreply_at_novell.com>
Date: Tue, 23 Jun 2009 10:37:02 -0600 (MDT)
Author: poeml
Date: 2009-06-23 10:36:59 -0600 (Tue, 23 Jun 2009)
New Revision: 7590

Modified:
   trunk/tools/download-redirector-v2/mirrordoctor/mb/testmirror.py
   trunk/tools/download-redirector-v2/mirrordoctor/mb/util.py
   trunk/tools/download-redirector-v2/mirrordoctor/mirrordoctor.py
Log:
Parallelizing mirror probing.

For this new feature, the Python module "processing" or "multiprocessing"
needs to be installed.  If none of them is found, the fallback behaviour is to
probe serially, as before.

This affects the "probefile" and the "file ls --probe" commands. It also
affects the scanner, in that it parallelizes the pre-scan check for existing
directories, if the scanner is started to scan a subdirectory only.
The mirrorprobe is not affected by this changes; it runs in threads.

Other user visible changes:
- mb probefile: 
  - implemented downloading and displaying the content.
  - an --urls switch was added, to select the kind of URLs to be probed.
    --urls=scan probes the URLs that would be used in scanning. 
    --urls=http probes the (HTTP) base URLs used in redirection.
    --urls=all probes all registered URLs.
  - proxy envvars are unset before probing.

Internal changes:
- mb.testmirror module:
  - req() is basically replaced by the probe() function, which takes Sample
    instances as arguments. A req() function (which wraps around probe()) is
    retained for backwards compatibility, although it's no longer in use by us.
- mb.util: 
  - a class named Sample was added which can be used to schedule URLs to probe,
    as well as collect the results.
    A reason for this is that the multiprocessing module need to pass objects
    that can be pickled, which isn't the case for SQLObjects' native objects.
    In addition, the class is easily extensible and much easier to use.
  - the time format output by the timer was slightly changed.


Modified: trunk/tools/download-redirector-v2/mirrordoctor/mb/testmirror.py
===================================================================
--- trunk/tools/download-redirector-v2/mirrordoctor/mb/testmirror.py	2009-06-23 16:04:57 UTC (rev 7589)
+++ trunk/tools/download-redirector-v2/mirrordoctor/mb/testmirror.py	2009-06-23 16:36:59 UTC (rev 7590)
_at_@ -13,8 +13,10 @@
 socket.setdefaulttimeout(TIMEOUT)
 
 def access_http(url):
-    r = urllib2.urlopen(url).read()
-    print r
+    from mb.util import Sample
+    S = Sample('', url, '', get_content=True)
+    probe(S)
+    return S.content
 
 
 def dont_use_proxies():
_at_@ -24,28 +26,33 @@
             del os.environ[i]
 
 
-def req(baseurl, filename, http_method='GET', do_digest=False):
+def req(baseurl, filename, http_method='GET', get_digest=False):
+    """compatibility method that wraps around probe(). It was used
+    before probe() existed and is probably not needed anymore.
+    """
+    from mb.util import Sample
+    S = Sample('', baseurl, filename, get_digest=get_digest)
+    probe(S, http_method=http_method)
+    return (S.http_code, S.digest)
 
-    url = baseurl + filename
-    worked = False
-    digest = None
 
-    if url.startswith('http://') or url.startswith('ftp://'):
-        req = urllib2.Request(url)
-        if url.startswith('http://') and http_method=='HEAD':
+def probe(S, http_method='GET'):
+
+    if S.scheme in ['http', 'ftp']:
+        req = urllib2.Request(S.probeurl)
+        if S.scheme == 'http' and http_method=='HEAD':
             # not for FTP URLs
             req.get_method = lambda: 'HEAD'
 
         try:
             response = urllib2.urlopen(req)
-            worked = True
         except KeyboardInterrupt:
             print >>sys.stderr, 'interrupted!'
             raise
         except:
-            return (0, digest)
+            return S
 
-        if do_digest:
+        if S.get_digest:
             try:
                 t = tempfile.NamedTemporaryFile()
                 while 1:
_at_@ -53,23 +60,30 @@
                     if not buf: break
                     t.write(buf)
                 t.flush()
-                digest = mb.util.dgst(t.name)
+                S.digest = mb.util.dgst(t.name)
                 t.close()
             except:
-                return (0, digest)
+                return S
+        if S.get_content:
+            S.content = response.read()
 
-        if url.startswith('http://'):
-            rc = response.code
-        elif url.startswith('ftp://'):
+        if S.scheme == 'http':
+            S.http_code = response.code
+            if S.http_code == 200:
+                S.has_file = True
+            else:
+                raise 'unhandled HTTP response code %s' % S.http_code
+        elif S.scheme == 'ftp':
+            # this works for directories. Not tested for files yet
             out = response.readline()
             if len(out):
-                rc = 1
+                has_file = True
             else:
-                rc = 0
+                has_file = False
 
-        return (rc, digest)
+        return S
 
-    elif url.startswith('rsync://'):
+    elif S.scheme == 'rsync':
 
         try:
             tmpdir = tempfile.mkdtemp(prefix='mb_probefile_')
_at_@ -83,25 +97,126 @@
             # presumabely runs a really old rsync server. The system seems to be 
             # SuSE Linux 8.2.)
             # poeml, Mon Jun 22 18:10:33 CEST 2009
-            cmd = 'rsync -d --timeout=%d %s %s/' % (TIMEOUT, url, tmpdir)
+            cmd = 'rsync -d --timeout=%d %s %s/' % (TIMEOUT, S.probeurl, tmpdir)
             (rc, out) = commands.getstatusoutput(cmd)
-            targetfile = os.path.join(tmpdir, os.path.basename(filename))
-            worked = os.path.exists(targetfile)
-            if worked and do_digest:
-                digest = mb.util.dgst(targetfile)
+            targetfile = os.path.join(tmpdir, os.path.basename(S.filename))
+            if os.path.exists(targetfile):
+                S.has_file = True
+            if S.has_file and S.get_digest:
+                S.digest = mb.util.dgst(targetfile)
+            if S.has_file and S.get_content:
+                S.content = open(targetfile).read()
 
-
         finally:
             shutil.rmtree(tmpdir, ignore_errors=True)
 
-        if rc != 0:
-            return (0, digest)
+        return S
 
-        if worked:
-            return (200, digest)
-        else:
-            return (0, digest)
+    else:
+        raise 'unknown URL type: %r' % S.probebaseurl
 
+
+
+def get_all_urls(mirror):
+    r = []
+    if mirror.baseurl:      r.append(mirror.baseurl)
+    if mirror.baseurlFtp:   r.append(mirror.baseurlFtp)
+    if mirror.baseurlRsync: r.append(mirror.baseurlRsync)
+    return r
+
+
+def get_best_scan_url(mirror):
+    return mirror.baseurlRsync or mirror.baseurlFtp or mirror.baseurl or None
+
+
+def make_probelist(mirrors, filename, url_type='http', get_digest=False, get_content=False):
+    """return list of Sample instances, in order to be probed.
+    The Sample instances are used to hold the probing results.
+    """
+    from mb.util import Sample
+    if url_type == 'http':
+        return [ Sample(i.identifier, i.baseurl, filename, 
+                        get_digest=get_digest, get_content=get_content) 
+                        for i in mirrors ]
+    elif url_type == 'scan':
+        return [ Sample(i.identifier, get_best_scan_url(i), filename, 
+                        get_digest=get_digest, get_content=get_content) 
+                        for i in mirrors ]
+    elif url_type == 'all':
+        return [ Sample(i.identifier, url, filename, 
+                        get_digest=get_digest, get_content=get_content) 
+                        for i in mirrors 
+                        for url in get_all_urls(i) ]
     else:
-        raise 'unknown URL type: %r' % baseurl
+        raise 'unknown url_type value: %r' % url_type
 
+
+def probe_report(m):
+    m = probe(m)
+    #print 'checked %s' % m.probeurl
+    print '.',
+    sys.stdout.flush()
+    return m
+
+# TODO:
+# for do_scan:
+#  - get list of mirrors that have directory or file foo
+#    find only the first URL
+# for do_probefile:
+#  - get list of mirrors that have directory or file foo, checking
+#    all URLs
+#  - optionally with md5 sums
+# for the mirrorprobe?
+# for general mirror testing?
+# for timestamp fetching? -> get the content of the timestamp file
+
+# use the multiprocessing module if available (Python 2.6/3.0)
+# fall back to the processing module
+# if none is availabe, serialize
+def mirrors_have_file(mirrors, filename, url_type='all', 
+                      get_digest=False, get_content=False):
+    mirrors = [ i for i in mirrors ]
+
+    # we create a list of "simple" objects that can be serialized (pickled) by the
+    # multiprocessing modules. That doesn't work with SQLObjects's result objects.
+    return probes_run(make_probelist(mirrors, filename, 
+                                     url_type=url_type, 
+                                     get_digest=get_digest, 
+                                     get_content=get_content))
+
+def lookups_probe(mirrors, get_digest=False, get_content=False):
+    from mb.util import Sample
+    probelist = [ Sample(i['identifier'], i['baseurl'], i['path'], 
+                         get_digest=get_digest, get_content=get_content) 
+                  for i in mirrors ]
+
+    return probes_run(probelist)
+
+
+def probes_run(probelist):
+    mp_mod = None
+    try:
+        from multiprocessing import Pool
+        mp_mod = 'multiprocessing'
+    except:
+        pass
+    try:
+        from processing import Pool
+        mp_mod = 'processing'
+    except:
+        if len(probelist) > 8:
+            print '>>> No multiprocessing module was found installed. For parallelizing'
+            print '>>> probing, install the "processing" or "multiprocessing" Python module.'
+
+
+    if mp_mod in ['processing', 'multiprocessing']:
+        p = Pool(24)
+        result = p.map_async(probe_report, probelist)
+        #print result.get(timeout=20)
+        return result.get()
+
+    else:
+        res = []
+        for i in probelist:
+            res.append(probe_report(i))
+        return res

Modified: trunk/tools/download-redirector-v2/mirrordoctor/mb/util.py
===================================================================
--- trunk/tools/download-redirector-v2/mirrordoctor/mb/util.py	2009-06-23 16:04:57 UTC (rev 7589)
+++ trunk/tools/download-redirector-v2/mirrordoctor/mb/util.py	2009-06-23 16:36:59 UTC (rev 7590)
_at_@ -26,6 +26,43 @@
         return '%s (%s AS%s)' % (self.ip, self.prefix, self.asn)
 
 
+class Sample:
+    """used for probe results."""
+    def __init__(self, identifier, probebaseurl, filename, 
+                 get_digest=False, get_content=False):
+        self.identifier = identifier
+        self.probebaseurl = probebaseurl
+        self.filename = filename
+        self.has_file = False
+        self.http_code = None
+        self.get_digest = get_digest
+        self.digest = None
+        self.get_content = get_content
+        self.content = None
+
+        if self.probebaseurl.startswith('http://'):
+            self.scheme = 'http'
+        elif self.probebaseurl.startswith('ftp://'): 
+            self.scheme = 'ftp'
+        elif self.probebaseurl.startswith('rsync://') \
+                or ('://' not in self.probebaseurl and '::' in self.probebaseurl):
+            self.scheme = 'rsync'
+        else:
+            raise 'unknown url type: %s' % self.probebaseurl
+
+        self.probeurl = self.probebaseurl.rstrip('/') + '/' + self.filename.lstrip('/')
+
+
+    def __str__(self):
+        s = 'M: %s %s, has_file=%s' \
+                % (self.identifier, self.probeurl, self.has_file)
+        if self.http_code:
+            s += ', http_code=%s' % self.http_code
+        if self.digest:
+            s += ', digest=%s' % self.digest
+        return s
+
+
 def data_url(basedir, path):
     import os, base64
 
_at_@ -119,11 +156,11 @@
     global t_start
 
     t_end = time.time()
-    t_delta = int(t_end - t_start)
+    t_delta = t_end - t_start
     if t_delta > 60 * 60: 
         return '%s hours' % round((t_delta / 60 / 60), 1)
     elif t_delta > 60:
         return '%s minutes' % round((t_delta / 60), 1)
     else:
-        return '%s seconds' % t_delta
+        return '%s seconds' % int(t_delta)
 

Modified: trunk/tools/download-redirector-v2/mirrordoctor/mirrordoctor.py
===================================================================
--- trunk/tools/download-redirector-v2/mirrordoctor/mirrordoctor.py	2009-06-23 16:04:57 UTC (rev 7589)
+++ trunk/tools/download-redirector-v2/mirrordoctor/mirrordoctor.py	2009-06-23 16:36:59 UTC (rev 7590)
_at_@ -346,6 +346,10 @@
     def do_test(self, subcmd, opts, identifier):
         """${cmd_name}: test if a mirror is working
 
+        This does only rudimentary checking for now. It only does a request
+        on the base URL. But more checks could easily be added.
+        (See the implementation of do_probefile() for ideas.)
+
         ${cmd_usage}
         ${cmd_option_list}
         """
_at_@ -353,11 +357,15 @@
         mirror = lookup_mirror(self, identifier)
         print mirror.baseurl
         import mb.testmirror
-        mb.testmirror.access_http(mirror.baseurl)
+        print mb.testmirror.access_http(mirror.baseurl)
 
 
+    _at_cmdln.option('--content', action='store_true',
+                        help='download and show the content')
     _at_cmdln.option('--md5', action='store_true',
                         help='download and show the md5 sum')
+    _at_cmdln.option('--urls', dest='url_type', metavar='TYPE', default='scan',
+                        help='type of URLs to be probed: [http|all|scan]')
     _at_cmdln.option('-m', '--mirror', 
                         help='probe only on this mirror')
     _at_cmdln.option('-a', '--all-mirrors', action='store_true',
_at_@ -387,35 +395,36 @@
                          AND(self.conn.Server.q.statusBaseurl, 
                              self.conn.Server.q.enabled))
 
-        found_mirrors = 0
         try:
+            mirrors_have_file = mb.testmirror.mirrors_have_file(mirrors, filename, 
+                                                               url_type=opts.url_type, get_digest=opts.md5,
+                                                               get_content=opts.content)
+            print
+            found_mirrors = 0
             for mirror in mirrors:
+                for sample in mirrors_have_file:
+                    if mirror.identifier == sample.identifier:
 
-                # TODO: add a nice library function for this
-                for baseurl in [mirror.baseurl, mirror.baseurlFtp, mirror.baseurlRsync]:
-                    if baseurl == None or baseurl == '':
-                        continue
-                    (response, md5) = mb.testmirror.req(baseurl, filename, do_digest=opts.md5)
-                    if opts.hide_negative and response != 200:
-                        continue
-                    if opts.md5:
-                        print "%3d %-30s %-32s %s" \
-                                % (response, mirror.identifier, md5 or '', os.path.join(baseurl, filename))
-                    else:
-                        print "%3d %-30s %s" \
-                                % (response, mirror.identifier, os.path.join(baseurl, filename))
+                        s = "%d %-30s" % (sample.has_file, sample.identifier)
+                        if opts.md5:
+                            s += " %-32s" % (sample.digest or '')
+                        s += " %s" % (os.path.join(sample.probeurl, filename))
+                        if sample.http_code:
+                            s += " http=%s" % (sample.http_code)
+                        print s
+                        if opts.content and sample.content:
+                            print repr(sample.content)
 
-                    if response == 200: found_mirrors += 1
+                        if sample.has_file: found_mirrors += 1
 
         except KeyboardInterrupt:
             print >>sys.stderr, 'interrupted!'
             return 1
 
+        print 'Found:', found_mirrors
 
-        print 'Broken:', found_mirrors
 
 
-
     def do_edit(self, subcmd, opts, identifier):
         """${cmd_name}: edit a new mirror entry in $EDITOR
 
_at_@ -662,23 +671,19 @@
             mirrors_to_scan = [ i for i in mirrors ]
         else:
             print 'Checking for existance of %r directory' % opts.directory
+            mirrors_have_file = mb.testmirror.mirrors_have_file(mirrors, opts.directory, url_type='scan')
+            print
             for mirror in mirrors:
-                # check whether the mirror has the requested directory, and if yes, add it
-                # to the list of mirrors to be scanned. Try URLs in order of efficacy for scanning.
-                has_dir = 0
-                for u in [mirror.baseurlRsync, mirror.baseurlFtp, mirror.baseurl]:
-                    if u == None or u == '':
-                        continue
-                    has_dir = mb.testmirror.req(u, opts.directory)[0]
-                    if has_dir:
-                        if self.options.debug:
-                            print '%s: scheduling scan.' % mirror.identifier
-                        mirrors_to_scan.append(mirror)
-                    break
-                if not has_dir:
-                    if self.options.debug:
-                        print '%s: directory %s not found. Skipping.' % (mirror.identifier, opts.directory)
-                    mirrors_skipped.append(mirror.identifier)
+                for sample in mirrors_have_file:
+                    if mirror.identifier == sample.identifier:
+                        if sample.has_file:
+                            if self.options.debug:
+                                print '%s: scheduling scan.' % mirror.identifier
+                            mirrors_to_scan.append(mirror)
+                        else:
+                            if self.options.debug:
+                                print '%s: directory %s not found. Skipping.' % (mirror.identifier, opts.directory)
+                            mirrors_skipped.append(mirror.identifier)
 
             if len(mirrors_to_scan):
                 print 'Scheduling scan on:'
_at_@ -696,7 +701,12 @@
         if self.options.debug:
             print cmd
         
+        if opts.directory:
+            print 'Completed in', mb.util.timer_elapsed()
+            mb.util.timer_start()
+
         sys.stdout.flush()
+
         import os
         rc = os.system(cmd)
 
_at_@ -800,6 +810,7 @@
 
         import mb.files
         import mb.testmirror
+        mb.testmirror.dont_use_proxies()
 
         if opts.md5:
             opts.probe = True
_at_@ -816,28 +827,31 @@
         if action == 'ls':
             rows = mb.files.ls(self.conn, path)
 
-            for row in rows:
-                
-                if not mirror or (str(mirror.identifier) == row['identifier']):
-                    if opts.probe:
-                        (response, md5) = mb.testmirror.req(row['baseurl'],
-                                                            row.get('path'),
-                                                            do_digest=opts.md5)
-                    else:
-                        response = '   '
+            if opts.probe:
+                samples = mb.testmirror.lookups_probe(rows, get_digest=opts.md5, get_content=False)
+            else:
+                samples = []
+
+            try:
+                for row in rows:
                     print '%s %s %4d %s %s %-30s ' % \
                             (row['region'].lower(), row['country'].lower(),
                              row['score'], 
                              row['enabled'] == 1 and 'ok      ' or 'disabled',
                              row['status_baseurl'] == 1 and 'ok  ' or 'dead',
                              row['identifier']),
-                    if opts.probe:
-                        print '%3s' % response,
-                    if opts.md5 and opts.probe:
-                        print md5,
+                    for sample in samples:
+                        if row['identifier'] == sample.identifier:
+                            if opts.probe:
+                                print '%3s' % (sample.http_code or '   '),
+                            if opts.probe and opts.md5:
+                                print (sample.digest or ' ' * 32),
                     if opts.url:
                         print row['baseurl'] + row['path'],
                     print
+            except KeyboardInterrupt:
+                print >>sys.stderr, 'interrupted!'
+                return 1
 
 
         elif action == 'add':

_______________________________________________
Opensuse-svn mailing list
Opensuse-svn_at_forge.novell.com
http://forge.novell.com/mailman/listinfo/opensuse-svn


_______________________________________________
mirrorbrain-commits mailing list
Archive: http://mirrorbrain.org/archive/mirrorbrain-commits/

Note: To remove yourself from this list, send a mail with the content
 	unsubscribe
to the address mirrorbrain-commits-request_at_mirrorbrain.org
Received on Tue Jun 23 2009 - 16:37:32 GMT

This archive was generated by hypermail 2.3.0 : Mon Feb 20 2012 - 23:47:04 GMT