Index: trunk/extensions/Offline/indexer.py |
— | — | @@ -0,0 +1,139 @@ |
| 2 | +#!/usr/bin/env python |
| 3 | +import os |
| 4 | +import fcntl |
| 5 | +import subprocess |
| 6 | +import bz2 |
| 7 | +import re |
| 8 | +import xapian |
| 9 | +import sys |
| 10 | +import string |
| 11 | + |
| 12 | +class Indexer(object): |
| 13 | + def __init__(self, xmlbz2_path): |
| 14 | + self.base_dir = os.path.dirname(xmlbz2_path) |
| 15 | + self.xmlbz2_path = xmlbz2_path |
| 16 | + self.splits_dir = self.base_dir |
| 17 | + #self.splits_dir = os.path.join(self.base_dir, "wiki-splits") |
| 18 | + |
| 19 | + def Semaphored(f): |
| 20 | + def f_wrapped(self, *args, **kw): |
| 21 | + m = Mutex(self.base_dir, f.__name__) |
| 22 | + if m.lock(): |
| 23 | + f(self, *args, **kw) |
| 24 | + m.done() |
| 25 | + return f_wrapped |
| 26 | + |
| 27 | + @Semaphored |
| 28 | + def split_dump(self): |
| 29 | + try: |
| 30 | + if not os.path.exists(self.splits_dir): |
| 31 | + os.makedirs(self.splits_dir) |
| 32 | + else: |
| 33 | + for dirpath, subdirs, files in os.walk(self.splits_dir): |
| 34 | + for f in files: |
| 35 | + if f.startswith('rec'): |
| 36 | + os.remove(os.path.join(dirpath, f)) |
| 37 | + |
| 38 | + subprocess.call(["bzip2recover", os.path.abspath(self.xmlbz2_path)]) |
| 39 | + except: |
| 40 | + raise |
| 41 | + print "Perfect, spliting complete. You should remove the original dumpfile." |
| 42 | + |
| 43 | + #def subdirize(self): |
| 44 | + # die("oops--this is for the special case where you have done an initial split, then want to make smaller files.") |
| 45 | + |
| 46 | + @Semaphored |
| 47 | + def index(self): |
| 48 | + db = Db(self.splits_dir) |
| 49 | + article_title_re = re.compile(' *<title>([^<]+)</title>') |
| 50 | + for dirpath, subdirs, files in os.walk(self.splits_dir): |
| 51 | + for f in files: |
| 52 | + if re.match('rec.*\.bz2', f): |
| 53 | + offset = 0 |
| 54 | + try: |
| 55 | + plain = bz2.BZ2File(os.path.join(dirpath, f)) |
| 56 | + for line in plain: |
| 57 | + title_match = article_title_re.match(line) |
| 58 | + if title_match: |
| 59 | + title = string.strip(title_match.group(1)) |
| 60 | + db.add(f, offset, title) |
| 61 | + offset += len(line) |
| 62 | + except: |
| 63 | + raise |
| 64 | + print "Index built - we are done" |
| 65 | + |
| 66 | + |
| 67 | +class Db(object): |
| 68 | + def __init__(self, db_path): |
| 69 | + try: |
| 70 | + # Open the database for update, creating a new database if necessary. |
| 71 | + self.database = xapian.WritableDatabase(db_path, xapian.DB_CREATE_OR_OPEN) |
| 72 | + |
| 73 | + self.indexer = xapian.TermGenerator() |
| 74 | + self.stemmer = xapian.Stem("english") # XXX |
| 75 | + self.indexer.set_stemmer(self.stemmer) |
| 76 | + except: |
| 77 | + raise |
| 78 | + |
| 79 | + def add(self, filename, offset, article_title): |
| 80 | + try: |
| 81 | + doc = xapian.Document() |
| 82 | + |
| 83 | + para = ":".join([filename, str(offset), article_title]) |
| 84 | + doc.set_data(para) |
| 85 | + |
| 86 | + self.indexer.set_document(doc) |
| 87 | + self.indexer.index_text(para) |
| 88 | + |
| 89 | + self.database.add_document(doc) |
| 90 | + except StopIteration: |
| 91 | + pass |
| 92 | + except: |
| 93 | + raise |
| 94 | + |
| 95 | + |
| 96 | +class Mutex(object): |
| 97 | + def __init__(self, dir_path, name): |
| 98 | + self.dir_path = dir_path |
| 99 | + self.name = name |
| 100 | + self.path = os.path.join(dir_path, "."+name) |
| 101 | + |
| 102 | + def done_mutex(self): |
| 103 | + return Mutex(self.dir_path, self.name+"-done") |
| 104 | + |
| 105 | + def done(self): |
| 106 | + self.done_mutex().lock() |
| 107 | + self.unlock() |
| 108 | + |
| 109 | + def busy(self): |
| 110 | + return os.path.exists(self.path) |
| 111 | + |
| 112 | + def lock(self): |
| 113 | + f = open(self.path, "w") |
| 114 | + try: |
| 115 | + fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) |
| 116 | + if not self.done_mutex().busy(): |
| 117 | + return True |
| 118 | + print "Already completed: %s" % self.name |
| 119 | + self.unlock() |
| 120 | + except: |
| 121 | + print "Failed to obtain lock for process: %s" % self.name |
| 122 | + raise |
| 123 | + return False |
| 124 | + |
| 125 | + def unlock(self): |
| 126 | + try: |
| 127 | + if os.path.exists(self.path): |
| 128 | + f = open(self.path, "w") |
| 129 | + fcntl.flock(f, fcntl.LOCK_UN) |
| 130 | + os.remove(self.path) |
| 131 | + except: |
| 132 | + raise |
| 133 | + |
| 134 | +if __name__ == '__main__': |
| 135 | + if len(sys.argv) == 2: |
| 136 | + indexer = Indexer(xmlbz2_path=sys.argv[1]) |
| 137 | + indexer.split_dump() |
| 138 | + indexer.index() |
| 139 | + else: |
| 140 | + print "Usage: indexer.py DUMP.xml.bz2" |
Property changes on: trunk/extensions/Offline/indexer.py |
___________________________________________________________________ |
Added: svn:eol-style |
1 | 141 | + native |
Added: svn:executable |
2 | 142 | + * |