r17779 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r17778‎ | r17779 | r17780 >
Date:05:27, 18 November 2006
Author:tstarling
Status:old
Tags:
Comment:
* Fixed the new gated job control system.
* Fixed a problem with queueSlave output being buffered, added flush() calls.
* A few other tweaks
Modified paths:
  • /trunk/tools/dumpHTML/compress-html (modified) (history)
  • /trunk/tools/dumpHTML/do-edition (modified) (history)
  • /trunk/tools/dumpHTML/queueController.php (modified) (history)
  • /trunk/tools/dumpHTML/queueSlave (modified) (history)

Diff [purge]

Index: trunk/tools/dumpHTML/queueSlave
@@ -10,11 +10,12 @@
1111 os.dup2(fd,2)
1212
1313 def dumpHTML(outputFile, *params):
14 - fullParams = ["nice", "nice", "-n15", "php","-n","dumpHTML.php"].extend(params)
 14+ fullParams = ["nice", "-n15", "php","-n","dumpHTML.php"]
 15+ fullParams.extend(params)
1516 pid = os.fork()
1617 if pid == 0:
1718 redirectOutput(outputFile)
18 - os.execvp(params)
 19+ os.execvp("nice", fullParams)
1920 sys.exit(1)
2021
2122 # Wait for the child to exit (or the parent)
@@ -29,37 +30,86 @@
3031
3132 def finishWiki(outputFile, lang):
3233 global edition, siteDir, baseDir
33 - if (os.path.isdir(siteDir+"/"+lang+"-new")):
34 - print "Already compressed "+lang+"\n"
 34+ if (not os.path.isdir(siteDir+"/"+lang+"-new")):
 35+ msg("Already compressed "+lang)
3536 return
36 - print "Finishing language "+lang+"\n"
37 - os.system("%(baseDir)/scripts/finish-lang %(lang) %(edition) 2>&1 >> %(outputFile)" % {
 37+ msg("Finishing language "+lang)
 38+ os.system("%(baseDir)s/scripts/finish-lang %(lang)s %(edition)s 2>&1 >> %(outputFile)s" % {
3839 'baseDir' : baseDir, 'lang' : lang, 'edition' : edition, 'outputFile': outputFile })
 40+ msg("Done")
3941
4042 def writeStatus(jobID, status):
4143 global baseDir
42 - f = open(basedir+"/jobs/"+jobID, "w")
 44+ f = open(baseDir+"/jobs/"+jobID, "w")
4345 print >> f, socket.gethostname(), os.getpid()
4446 print >> f, status
4547 f.close()
4648
 49+def isStatusMine(jobID):
 50+ global baseDir
 51+ try:
 52+ f = open(baseDir+"/jobs/"+jobID, "r")
 53+ except:
 54+ msg("Status file is missing")
 55+ return False
 56+
 57+ fields = f.readline().split()
 58+ f.close()
 59+ if len(fields) != 2:
 60+ msg("Warning: invalid status file")
 61+ return False
 62+
 63+ if fields[0] == socket.gethostname() and fields[1] == str(os.getpid()):
 64+ return True
 65+ else:
 66+ return False
 67+
4768 def isDone(checkpoint, jobType):
4869 test = jobType+'=done'
49 - f = open(checkpoint, "r")
 70+ try: f = open(checkpoint, "r")
 71+ except:
 72+ return False
5073 try:
5174 for line in f:
5275 if line.rstrip() == test:
53 - return true
 76+ return True
5477 finally:
5578 f.close()
 79+ return False
5680
 81+def writeStatusIfMine(jobID, status):
 82+ if isStatusMine(jobID):
 83+ writeStatus(jobID, status)
 84+ else:
 85+ msg("Not overwriting status file, it doesn't belong to me.")
5786
 87+def msg(*params):
 88+ print " ".join(params)
 89+ sys.stdout.flush()
 90+
 91+#---------------------------------------------------------------------------------
 92+
 93+hostname = socket.gethostname()
 94+myPid = os.getpid()
 95+
 96+msg("queueSlave on %s %d" % (hostname, myPid))
 97+
5898 queueHost = sys.argv[1]
5999 queuePort = int(sys.argv[2])
60100 baseDir = sys.argv[3]
61101 edition = sys.argv[4]
62102 siteDir = baseDir+"/wikipedia"
 103+logDir = baseDir+"/logs"
 104+jobDir = baseDir+"/jobs"
 105+checkpointDir = baseDir+"/checkpoints"
63106
 107+try: os.makedirs(logDir)
 108+except: pass
 109+try: os.makedirs(jobDir)
 110+except: pass
 111+try: os.makedirs(checkpointDir)
 112+except: pass
 113+
64114 queueSock = socket.socket()
65115 queueSock.connect((queueHost, queuePort))
66116 queueFile = queueSock.makefile()
@@ -84,40 +134,43 @@
85135 lang = wiki.replace( 'wiki', '' )
86136 dest = siteDir+"/"+lang+"-new"
87137 jobString = wiki+"_" + type + "_" + slice.replace( '/', '_' )
88 - outputFile = baseDir+"/logs/"+jobString
89 - checkpoint = baseDir+"/checkpoints/"+jobString
 138+ outputFile = logDir+"/"+jobString
 139+ checkpoint = checkpointDir+"/"+jobString
90140
91141 if type == "articles":
92142 writeStatus(jobID, 'running')
93 - print wiki + ' articles ' + slice
94 - dumpHTML(outputFile, wiki,"--no-shared-desc", #"--force-copy",
95 - "--image-snapshot","--interlang","-d",dest,"--slice",slice,
 143+ msg(wiki + ' articles ' + slice)
 144+ dumpHTML(outputFile, wiki,"--no-shared-desc", #"--force-copy", "--image-snapshot",
 145+ "--interlang","-d",dest,"--slice",slice,
96146 "--checkpoint",checkpoint,"--no-overwrite")
97147
98148 if isDone(checkpoint, 'everything'):
99 - writeStatus(jobID, 'done')
 149+ msg("Done")
 150+ writeStatusIfMine(jobID, 'done')
100151 else:
101 - writeStatus(jobID, 'terminated')
 152+ msg("Terminated, unfinished")
 153+ writeStatusIfMine(jobID, 'terminated')
102154
103155 elif type == "shared":
104156 writeStatus(jobID, 'running')
105 - print wiki + ' shared ' + slice
106 - dumpHTML(outputFile, wiki,"--shared-desc", #"--force-copy",
107 - "--image-snapshot","--interlang","-d",dest,"--slice",slice,
 157+ msg(wiki + ' shared ' + slice)
 158+ dumpHTML(outputFile, wiki,"--shared-desc", #"--force-copy", "--image-snapshot",
 159+ "--interlang","-d",dest,"--slice",slice,
108160 "--checkpoint",checkpoint,"--no-overwrite")
109161 if isDone(checkpoint, 'shared image'):
110 - writeStatus(jobID, 'done')
 162+ msg("Done")
 163+ writeStatusIfMine(jobID, 'done')
111164 else:
112 - writeStatus(jobID, 'terminated')
 165+ msg("Terminated, unfinished")
 166+ writeStatusIfMine(jobID, 'terminated')
113167
114168 elif type == "finish":
115169 writeStatus(jobID, 'running')
116 - print wiki + ' finishing'
117 - finishWiki(outputFile, wiki)
118 - writeStatus(jobID, 'done')
 170+ finishWiki(outputFile, lang)
 171+ writeStatusIfMine(jobID, 'done')
119172 else:
120173 if not waiting:
121 - print "Waiting..."
 174+ msg("Waiting...")
122175 waiting = True
123176 time.sleep(5)
124177
Index: trunk/tools/dumpHTML/do-edition
@@ -11,12 +11,11 @@
1212 edition = sys.argv[1]
1313
1414 threads = {
15 - "localhost": 1
16 - #"srv42": 4,
17 - #"srv122": 2,
18 - #"srv123": 2,
19 - #"srv124": 2,
20 - #"srv125": 2
 15+ "localhost": 4,
 16+ "srv122": 2,
 17+ "srv123": 2,
 18+ "srv124": 2,
 19+ "srv125": 2
2120 }
2221
2322 # Create some directories
Index: trunk/tools/dumpHTML/compress-html
@@ -34,7 +34,7 @@
3535 rm -f $dest/wikipedia-$lang-html.7z
3636
3737 # Set chunk size to 8MB for faster random access
38 -7z -l -ms8m a $dest/wikipedia-$lang-html.7z @$dest/html.lst @$dest/skins.lst
 38+7za -l -ms8m a $dest/wikipedia-$lang-html.7z @$dest/html.lst @$dest/skins.lst
3939
4040 #if [ ! -e $dest/wikipedia-$lang-reduced.7z ];then
4141 # echo
Index: trunk/tools/dumpHTML/queueController.php
@@ -1,6 +1,6 @@
22 <?php
33
4 -$basedir = '/mnt/static';
 4+$baseDir = '/mnt/static';
55
66 $wgNoDBParam = true;
77 require_once( '/home/wikipedia/common/php/maintenance/commandLine.inc' );
@@ -24,7 +24,7 @@
2525 fgets( $queueSock );
2626
2727 # Fetch wiki stats
28 -$wikiSizes = @file_get_contents( "$basedir/checkpoints/wikiSizes" );
 28+$wikiSizes = @file_get_contents( "$baseDir/checkpoints/wikiSizes" );
2929 if ( $wikiSizes ) {
3030 $wikiSizes = unserialize( $wikiSizes );
3131 } else {
@@ -38,7 +38,7 @@
3939
4040 $wikiSizes[$wiki] = $db->selectField( "`$wiki`.site_stats", 'ss_total_pages' );
4141 }
42 - file_put_contents( "$basedir/checkpoints/wikiSizes", serialize( $wikiSizes ) );
 42+ file_put_contents( "$baseDir/checkpoints/wikiSizes", serialize( $wikiSizes ) );
4343 }
4444
4545 # Compute job array
@@ -59,6 +59,7 @@
6060 $jobs[] = array(
6161 'id' => $jobID,
6262 'cmd' => "$jobID $wiki articles $i/$numJobs",
 63+ 'wiki'=> $wiki,
6364 'trigger' => $trigger
6465 );
6566 }
@@ -74,6 +75,7 @@
7576 'id' => $jobID,
7677 'gate' => "$wiki articles",
7778 'cmd' => "$jobID $wiki shared $i/$numSharedJobs",
 79+ 'wiki' => $wiki,
7880 'trigger' => $trigger
7981 );
8082 }
@@ -84,14 +86,15 @@
8587 'id' => $jobID,
8688 'gate' => "$wiki shared",
8789 'cmd' => "$jobID $wiki finish 1/1",
 90+ 'wiki' => $wiki,
8891 'trigger' => 'everything',
8992 );
9093 }
9194
9295 # Write job list
93 -$file = fopen( "$basedir/jobs/list", 'w' );
 96+$file = fopen( "$baseDir/jobs/list", 'w' );
9497 if ( !$file ) {
95 - print "Unable to open $basedir/jobs/list for writing\n";
 98+ print "Unable to open $baseDir/jobs/list for writing\n";
9699 exit( 1 );
97100 }
98101 foreach ( $jobs as $job ) {
@@ -144,11 +147,13 @@
145148 $queueing = true;
146149 } elseif ( isTerminated( $job ) ) {
147150 print "Job $i died, requeueing: {$job['cmd']}\n";
 151+ removeJobStatus( $job );
148152 $queueing = true;
149153 } else {
150154 $queueing = false;
151155 }
152156 if ( $queueing ) {
 157+ $wiki = $job['wiki'];
153158 if ( !isset( $initialisedWikis[$wiki] ) ) {
154159 startWiki( $wiki );
155160 $initialisedWikis[$wiki] = true;
@@ -179,9 +184,10 @@
180185 }
181186
182187 function getJobStatus( $job ) {
183 - global $basedir;
184 - $jobStatusFile = "$basedir/jobs/{$job['id']}";
185 - $lines = @file( $jobCpFile );
 188+ global $baseDir;
 189+ $jobStatusFile = "$baseDir/jobs/{$job['id']}";
 190+ $lines = @file( $jobStatusFile );
 191+
186192 if ( !isset( $lines[1] ) ) {
187193 return false;
188194 } else {
@@ -189,6 +195,12 @@
190196 }
191197 }
192198
 199+function removeJobStatus( $job ) {
 200+ global $baseDir;
 201+ $jobStatusFile = "$baseDir/jobs/{$job['id']}";
 202+ @unlink( $jobStatusFile );
 203+}
 204+
193205 function isDone( $job ) {
194206 return getJobStatus( $job ) == 'done';
195207 }
@@ -199,7 +211,7 @@
200212
201213 function enqueue( $job ) {
202214 global $queueSock;
203 - if ( false === fwrite( $queueSock, "enq $job\n" ) ) {
 215+ if ( false === fwrite( $queueSock, "enq {$job['cmd']}\n" ) ) {
204216 die( "Unable to write to queue server\n" );
205217 }
206218
@@ -208,10 +220,10 @@
209221 }
210222
211223 function startWiki( $wiki ) {
212 - global $basedir;
 224+ global $baseDir;
213225 $lang = str_replace( 'wiki', '', $wiki );
214226 print "Starting language $lang\n";
215 - passthru( "$basedir/scripts/start-lang $lang" );
 227+ passthru( "$baseDir/scripts/start-lang $lang" );
216228 }
217229
218230 ?>