Index: trunk/tools/dumpHTML/queueSlave |
— | — | @@ -10,11 +10,12 @@ |
11 | 11 | os.dup2(fd,2) |
12 | 12 | |
13 | 13 | def dumpHTML(outputFile, *params): |
14 | | - fullParams = ["nice", "nice", "-n15", "php","-n","dumpHTML.php"].extend(params) |
| 14 | + fullParams = ["nice", "-n15", "php","-n","dumpHTML.php"] |
| 15 | + fullParams.extend(params) |
15 | 16 | pid = os.fork() |
16 | 17 | if pid == 0: |
17 | 18 | redirectOutput(outputFile) |
18 | | - os.execvp(params) |
| 19 | + os.execvp("nice", fullParams) |
19 | 20 | sys.exit(1) |
20 | 21 | |
21 | 22 | # Wait for the child to exit (or the parent) |
— | — | @@ -29,37 +30,86 @@ |
30 | 31 | |
31 | 32 | def finishWiki(outputFile, lang): |
32 | 33 | global edition, siteDir, baseDir |
33 | | - if (os.path.isdir(siteDir+"/"+lang+"-new")): |
34 | | - print "Already compressed "+lang+"\n" |
| 34 | + if (not os.path.isdir(siteDir+"/"+lang+"-new")): |
| 35 | + msg("Already compressed "+lang) |
35 | 36 | return |
36 | | - print "Finishing language "+lang+"\n" |
37 | | - os.system("%(baseDir)/scripts/finish-lang %(lang) %(edition) 2>&1 >> %(outputFile)" % { |
| 37 | + msg("Finishing language "+lang) |
| 38 | + os.system("%(baseDir)s/scripts/finish-lang %(lang)s %(edition)s 2>&1 >> %(outputFile)s" % { |
38 | 39 | 'baseDir' : baseDir, 'lang' : lang, 'edition' : edition, 'outputFile': outputFile }) |
| 40 | + msg("Done") |
39 | 41 | |
40 | 42 | def writeStatus(jobID, status): |
41 | 43 | global baseDir |
42 | | - f = open(basedir+"/jobs/"+jobID, "w") |
| 44 | + f = open(baseDir+"/jobs/"+jobID, "w") |
43 | 45 | print >> f, socket.gethostname(), os.getpid() |
44 | 46 | print >> f, status |
45 | 47 | f.close() |
46 | 48 | |
| 49 | +def isStatusMine(jobID): |
| 50 | + global baseDir |
| 51 | + try: |
| 52 | + f = open(baseDir+"/jobs/"+jobID, "r") |
| 53 | + except: |
| 54 | + msg("Status file is missing") |
| 55 | + return False |
| 56 | + |
| 57 | + fields = f.readline().split() |
| 58 | + f.close() |
| 59 | + if len(fields) != 2: |
| 60 | + msg("Warning: invalid status file") |
| 61 | + return False |
| 62 | + |
| 63 | + if fields[0] == socket.gethostname() and fields[1] == str(os.getpid()): |
| 64 | + return True |
| 65 | + else: |
| 66 | + return False |
| 67 | + |
47 | 68 | def isDone(checkpoint, jobType): |
48 | 69 | test = jobType+'=done' |
49 | | - f = open(checkpoint, "r") |
| 70 | + try: f = open(checkpoint, "r") |
| 71 | + except: |
| 72 | + return False |
50 | 73 | try: |
51 | 74 | for line in f: |
52 | 75 | if line.rstrip() == test: |
53 | | - return true |
| 76 | + return True |
54 | 77 | finally: |
55 | 78 | f.close() |
| 79 | + return False |
56 | 80 | |
| 81 | +def writeStatusIfMine(jobID, status): |
| 82 | + if isStatusMine(jobID): |
| 83 | + writeStatus(jobID, status) |
| 84 | + else: |
| 85 | + msg("Not overwriting status file, it doesn't belong to me.") |
57 | 86 | |
| 87 | +def msg(*params): |
| 88 | + print " ".join(params) |
| 89 | + sys.stdout.flush() |
| 90 | + |
| 91 | +#--------------------------------------------------------------------------------- |
| 92 | + |
| 93 | +hostname = socket.gethostname() |
| 94 | +myPid = os.getpid() |
| 95 | + |
| 96 | +msg("queueSlave on %s %d" % (hostname, myPid)) |
| 97 | + |
58 | 98 | queueHost = sys.argv[1] |
59 | 99 | queuePort = int(sys.argv[2]) |
60 | 100 | baseDir = sys.argv[3] |
61 | 101 | edition = sys.argv[4] |
62 | 102 | siteDir = baseDir+"/wikipedia" |
| 103 | +logDir = baseDir+"/logs" |
| 104 | +jobDir = baseDir+"/jobs" |
| 105 | +checkpointDir = baseDir+"/checkpoints" |
63 | 106 | |
| 107 | +try: os.makedirs(logDir) |
| 108 | +except: pass |
| 109 | +try: os.makedirs(jobDir) |
| 110 | +except: pass |
| 111 | +try: os.makedirs(checkpointDir) |
| 112 | +except: pass |
| 113 | + |
64 | 114 | queueSock = socket.socket() |
65 | 115 | queueSock.connect((queueHost, queuePort)) |
66 | 116 | queueFile = queueSock.makefile() |
— | — | @@ -84,40 +134,43 @@ |
85 | 135 | lang = wiki.replace( 'wiki', '' ) |
86 | 136 | dest = siteDir+"/"+lang+"-new" |
87 | 137 | jobString = wiki+"_" + type + "_" + slice.replace( '/', '_' ) |
88 | | - outputFile = baseDir+"/logs/"+jobString |
89 | | - checkpoint = baseDir+"/checkpoints/"+jobString |
| 138 | + outputFile = logDir+"/"+jobString |
| 139 | + checkpoint = checkpointDir+"/"+jobString |
90 | 140 | |
91 | 141 | if type == "articles": |
92 | 142 | writeStatus(jobID, 'running') |
93 | | - print wiki + ' articles ' + slice |
94 | | - dumpHTML(outputFile, wiki,"--no-shared-desc", #"--force-copy", |
95 | | - "--image-snapshot","--interlang","-d",dest,"--slice",slice, |
| 143 | + msg(wiki + ' articles ' + slice) |
| 144 | + dumpHTML(outputFile, wiki,"--no-shared-desc", #"--force-copy", "--image-snapshot", |
| 145 | + "--interlang","-d",dest,"--slice",slice, |
96 | 146 | "--checkpoint",checkpoint,"--no-overwrite") |
97 | 147 | |
98 | 148 | if isDone(checkpoint, 'everything'): |
99 | | - writeStatus(jobID, 'done') |
| 149 | + msg("Done") |
| 150 | + writeStatusIfMine(jobID, 'done') |
100 | 151 | else: |
101 | | - writeStatus(jobID, 'terminated') |
| 152 | + msg("Terminated, unfinished") |
| 153 | + writeStatusIfMine(jobID, 'terminated') |
102 | 154 | |
103 | 155 | elif type == "shared": |
104 | 156 | writeStatus(jobID, 'running') |
105 | | - print wiki + ' shared ' + slice |
106 | | - dumpHTML(outputFile, wiki,"--shared-desc", #"--force-copy", |
107 | | - "--image-snapshot","--interlang","-d",dest,"--slice",slice, |
| 157 | + msg(wiki + ' shared ' + slice) |
| 158 | + dumpHTML(outputFile, wiki,"--shared-desc", #"--force-copy", "--image-snapshot", |
| 159 | + "--interlang","-d",dest,"--slice",slice, |
108 | 160 | "--checkpoint",checkpoint,"--no-overwrite") |
109 | 161 | if isDone(checkpoint, 'shared image'): |
110 | | - writeStatus(jobID, 'done') |
| 162 | + msg("Done") |
| 163 | + writeStatusIfMine(jobID, 'done') |
111 | 164 | else: |
112 | | - writeStatus(jobID, 'terminated') |
| 165 | + msg("Terminated, unfinished") |
| 166 | + writeStatusIfMine(jobID, 'terminated') |
113 | 167 | |
114 | 168 | elif type == "finish": |
115 | 169 | writeStatus(jobID, 'running') |
116 | | - print wiki + ' finishing' |
117 | | - finishWiki(outputFile, wiki) |
118 | | - writeStatus(jobID, 'done') |
| 170 | + finishWiki(outputFile, lang) |
| 171 | + writeStatusIfMine(jobID, 'done') |
119 | 172 | else: |
120 | 173 | if not waiting: |
121 | | - print "Waiting..." |
| 174 | + msg("Waiting...") |
122 | 175 | waiting = True |
123 | 176 | time.sleep(5) |
124 | 177 | |
Index: trunk/tools/dumpHTML/do-edition |
— | — | @@ -11,12 +11,11 @@ |
12 | 12 | edition = sys.argv[1] |
13 | 13 | |
14 | 14 | threads = { |
15 | | - "localhost": 1 |
16 | | - #"srv42": 4, |
17 | | - #"srv122": 2, |
18 | | - #"srv123": 2, |
19 | | - #"srv124": 2, |
20 | | - #"srv125": 2 |
| 15 | + "localhost": 4, |
| 16 | + "srv122": 2, |
| 17 | + "srv123": 2, |
| 18 | + "srv124": 2, |
| 19 | + "srv125": 2 |
21 | 20 | } |
22 | 21 | |
23 | 22 | # Create some directories |
Index: trunk/tools/dumpHTML/compress-html |
— | — | @@ -34,7 +34,7 @@ |
35 | 35 | rm -f $dest/wikipedia-$lang-html.7z |
36 | 36 | |
37 | 37 | # Set chunk size to 8MB for faster random access |
38 | | -7z -l -ms8m a $dest/wikipedia-$lang-html.7z @$dest/html.lst @$dest/skins.lst |
| 38 | +7za -l -ms8m a $dest/wikipedia-$lang-html.7z @$dest/html.lst @$dest/skins.lst |
39 | 39 | |
40 | 40 | #if [ ! -e $dest/wikipedia-$lang-reduced.7z ];then |
41 | 41 | # echo |
Index: trunk/tools/dumpHTML/queueController.php |
— | — | @@ -1,6 +1,6 @@ |
2 | 2 | <?php |
3 | 3 | |
4 | | -$basedir = '/mnt/static'; |
| 4 | +$baseDir = '/mnt/static'; |
5 | 5 | |
6 | 6 | $wgNoDBParam = true; |
7 | 7 | require_once( '/home/wikipedia/common/php/maintenance/commandLine.inc' ); |
— | — | @@ -24,7 +24,7 @@ |
25 | 25 | fgets( $queueSock ); |
26 | 26 | |
27 | 27 | # Fetch wiki stats |
28 | | -$wikiSizes = @file_get_contents( "$basedir/checkpoints/wikiSizes" ); |
| 28 | +$wikiSizes = @file_get_contents( "$baseDir/checkpoints/wikiSizes" ); |
29 | 29 | if ( $wikiSizes ) { |
30 | 30 | $wikiSizes = unserialize( $wikiSizes ); |
31 | 31 | } else { |
— | — | @@ -38,7 +38,7 @@ |
39 | 39 | |
40 | 40 | $wikiSizes[$wiki] = $db->selectField( "`$wiki`.site_stats", 'ss_total_pages' ); |
41 | 41 | } |
42 | | - file_put_contents( "$basedir/checkpoints/wikiSizes", serialize( $wikiSizes ) ); |
| 42 | + file_put_contents( "$baseDir/checkpoints/wikiSizes", serialize( $wikiSizes ) ); |
43 | 43 | } |
44 | 44 | |
45 | 45 | # Compute job array |
— | — | @@ -59,6 +59,7 @@ |
60 | 60 | $jobs[] = array( |
61 | 61 | 'id' => $jobID, |
62 | 62 | 'cmd' => "$jobID $wiki articles $i/$numJobs", |
| 63 | + 'wiki'=> $wiki, |
63 | 64 | 'trigger' => $trigger |
64 | 65 | ); |
65 | 66 | } |
— | — | @@ -74,6 +75,7 @@ |
75 | 76 | 'id' => $jobID, |
76 | 77 | 'gate' => "$wiki articles", |
77 | 78 | 'cmd' => "$jobID $wiki shared $i/$numSharedJobs", |
| 79 | + 'wiki' => $wiki, |
78 | 80 | 'trigger' => $trigger |
79 | 81 | ); |
80 | 82 | } |
— | — | @@ -84,14 +86,15 @@ |
85 | 87 | 'id' => $jobID, |
86 | 88 | 'gate' => "$wiki shared", |
87 | 89 | 'cmd' => "$jobID $wiki finish 1/1", |
| 90 | + 'wiki' => $wiki, |
88 | 91 | 'trigger' => 'everything', |
89 | 92 | ); |
90 | 93 | } |
91 | 94 | |
92 | 95 | # Write job list |
93 | | -$file = fopen( "$basedir/jobs/list", 'w' ); |
| 96 | +$file = fopen( "$baseDir/jobs/list", 'w' ); |
94 | 97 | if ( !$file ) { |
95 | | - print "Unable to open $basedir/jobs/list for writing\n"; |
| 98 | + print "Unable to open $baseDir/jobs/list for writing\n"; |
96 | 99 | exit( 1 ); |
97 | 100 | } |
98 | 101 | foreach ( $jobs as $job ) { |
— | — | @@ -144,11 +147,13 @@ |
145 | 148 | $queueing = true; |
146 | 149 | } elseif ( isTerminated( $job ) ) { |
147 | 150 | print "Job $i died, requeueing: {$job['cmd']}\n"; |
| 151 | + removeJobStatus( $job ); |
148 | 152 | $queueing = true; |
149 | 153 | } else { |
150 | 154 | $queueing = false; |
151 | 155 | } |
152 | 156 | if ( $queueing ) { |
| 157 | + $wiki = $job['wiki']; |
153 | 158 | if ( !isset( $initialisedWikis[$wiki] ) ) { |
154 | 159 | startWiki( $wiki ); |
155 | 160 | $initialisedWikis[$wiki] = true; |
— | — | @@ -179,9 +184,10 @@ |
180 | 185 | } |
181 | 186 | |
182 | 187 | function getJobStatus( $job ) { |
183 | | - global $basedir; |
184 | | - $jobStatusFile = "$basedir/jobs/{$job['id']}"; |
185 | | - $lines = @file( $jobCpFile ); |
| 188 | + global $baseDir; |
| 189 | + $jobStatusFile = "$baseDir/jobs/{$job['id']}"; |
| 190 | + $lines = @file( $jobStatusFile ); |
| 191 | + |
186 | 192 | if ( !isset( $lines[1] ) ) { |
187 | 193 | return false; |
188 | 194 | } else { |
— | — | @@ -189,6 +195,12 @@ |
190 | 196 | } |
191 | 197 | } |
192 | 198 | |
| 199 | +function removeJobStatus( $job ) { |
| 200 | + global $baseDir; |
| 201 | + $jobStatusFile = "$baseDir/jobs/{$job['id']}"; |
| 202 | + @unlink( $jobStatusFile ); |
| 203 | +} |
| 204 | + |
193 | 205 | function isDone( $job ) { |
194 | 206 | return getJobStatus( $job ) == 'done'; |
195 | 207 | } |
— | — | @@ -199,7 +211,7 @@ |
200 | 212 | |
201 | 213 | function enqueue( $job ) { |
202 | 214 | global $queueSock; |
203 | | - if ( false === fwrite( $queueSock, "enq $job\n" ) ) { |
| 215 | + if ( false === fwrite( $queueSock, "enq {$job['cmd']}\n" ) ) { |
204 | 216 | die( "Unable to write to queue server\n" ); |
205 | 217 | } |
206 | 218 | |
— | — | @@ -208,10 +220,10 @@ |
209 | 221 | } |
210 | 222 | |
211 | 223 | function startWiki( $wiki ) { |
212 | | - global $basedir; |
| 224 | + global $baseDir; |
213 | 225 | $lang = str_replace( 'wiki', '', $wiki ); |
214 | 226 | print "Starting language $lang\n"; |
215 | | - passthru( "$basedir/scripts/start-lang $lang" ); |
| 227 | + passthru( "$baseDir/scripts/start-lang $lang" ); |
216 | 228 | } |
217 | 229 | |
218 | 230 | ?> |