r23293 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r23292‎ | r23293 | r23294 >
Date:11:24, 23 June 2007
Author:rainman
Status:old
Tags:
Comment:
* Make single index searcher pool size configurable per host
* Cleanup of unused/experm stuff (e.g. recentupdatesdaemon)
* Drop the chechout/release mechanism for searchers, force close
after 15s if the sercher is updated
Modified paths:
  • /trunk/lucene-search-2.0/lsearch.conf (modified) (history)
  • /trunk/lucene-search-2.0/src/org/wikimedia/lsearch/analyzers/WikiQueryParser.java (modified) (history)
  • /trunk/lucene-search-2.0/src/org/wikimedia/lsearch/index/IndexThread.java (modified) (history)
  • /trunk/lucene-search-2.0/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java (modified) (history)
  • /trunk/lucene-search-2.0/src/org/wikimedia/lsearch/search/SearchEngine.java (modified) (history)
  • /trunk/lucene-search-2.0/src/org/wikimedia/lsearch/search/SearcherCache.java (modified) (history)
  • /trunk/lucene-search-2.0/src/org/wikimedia/lsearch/search/UpdateThread.java (modified) (history)
  • /trunk/lucene-search-2.0/src/org/wikimedia/lsearch/search/WikiSearcher.java (modified) (history)
  • /trunk/lucene-search-2.0/src/org/wikimedia/lsearch/test/WikiQueryParserTest.java (modified) (history)
  • /trunk/lucene-search-2.0/src/org/wikimedia/lsearch/util/Localization.java (modified) (history)
  • /trunk/lucene-search-2.0/src/org/wikimedia/lsearch/util/RecentUpdatesDaemon.java (deleted) (history)

Diff [purge]

Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/test/WikiQueryParserTest.java
@@ -34,7 +34,6 @@
3535 Configuration.setConfigFile(System.getProperty("user.dir")+"/test-data/mwsearch.conf.test");
3636 Configuration.open();
3737 WikiQueryParser.TITLE_BOOST = 2;
38 - WikiQueryParser.REDIRECT_BOOST = 0.2f;
3938 WikiQueryParser.ALT_TITLE_BOOST = 6;
4039 WikiQueryParser.KEYWORD_BOOST = 0.05f;
4140 WikiIndexModifier.ALT_TITLES = 3;
@@ -173,7 +172,7 @@
174173 assertEquals("+(+(contents:beans contents:bean^0.5) +category:food) +(+contents:orchid +category:\"some flowers\")",q.toString());
175174
176175 q = parser.parseRaw("(Beans AND incategory:FOod) (orchID AND incategory:\"some FLOWERS\")");
177 - assertEquals("+(+(contents:beans contents:bean^0.5) +category:FOod) +(+contents:orchid +category:\"some FLOWERS\")",q.toString());
 176+ assertEquals("+(+(contents:beans contents:bean^0.5) +category:food) +(+contents:orchid +category:\"some flowers\")",q.toString());
178177
179178 q = parser.parse("(beans AND incategory:food) (orchid AND incategory:\"some flowers\")");
180179 assertEquals("+(+(+(contents:beans contents:bean^0.5) title:beans^2.0) +category:food) +(+(+contents:orchid +category:\"some flowers\") title:orchid^2.0)",q.toString());
@@ -341,6 +340,12 @@
342341 q = parser.parseTwoPass("all_talk: beans everyone",NamespacePolicy.REWRITE);
343342 assertEquals("(+(namespace:1 namespace:3 namespace:5 namespace:7 namespace:9 namespace:11 namespace:13 namespace:15) +(+(contents:beans contents:bean^0.5) +(contents:everyone contents:everyon^0.5))) (+(namespace:1 namespace:3 namespace:5 namespace:7 namespace:9 namespace:11 namespace:13 namespace:15) +(+title:beans^2.0 +title:everyone^2.0))",q.toString());
344343
 344+ // German
 345+ analyzer = Analyzers.getSearcherAnalyzer("de");
 346+ bs = new FieldBuilder("de").getBuilder();
 347+ parser = new WikiQueryParser(bs.getFields().contents(),"0",analyzer,bs,NamespacePolicy.IGNORE);
 348+ q = parser.parseTwoPass("welche rolle spielen Mineralstoffe in der Ernährung?",NamespacePolicy.IGNORE);
 349+ assertEquals("(+(contents:welche contents:welch^0.5) +(contents:rolle contents:roll^0.5) +(contents:spielen contents:spiel^0.5) +(contents:mineralstoffe contents:mineralstoff^0.5) +contents:in +contents:der +(+(contents:ernahrung contents:ernahr^0.5) (contents:ernaehrung contents:ernaehr^0.5))) (+title:welche^2.0 +title:rolle^2.0 +title:spielen^2.0 +title:mineralstoffe^2.0 +title:in^2.0 +title:der^2.0 +(title:ernahrung^2.0 title:ernaehrung^2.0))",q.toString());
345350
346351 // Test field extraction
347352 HashSet<NamespaceFilter> fs = parser.getFieldNamespaces("main:something [1]:else all:oh []:nja");
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/search/SearcherCache.java
@@ -19,6 +19,7 @@
2020 import org.apache.lucene.search.Searchable;
2121 import org.apache.lucene.search.SearchableMul;
2222 import org.wikimedia.lsearch.beans.SearchHost;
 23+import org.wikimedia.lsearch.config.Configuration;
2324 import org.wikimedia.lsearch.config.GlobalConfiguration;
2425 import org.wikimedia.lsearch.config.IndexId;
2526 import org.wikimedia.lsearch.config.IndexRegistry;
@@ -57,16 +58,15 @@
5859 log.warn("I/O error closing searchables "+s);
5960 }
6061 }
61 - }
 62+ }
6263
63 - public static final int OPEN_SEARCHERS = 4;
64 -
6564 /** Holds OPEN_SEARCHERS num of index searchers, for multiprocessor workstations */
6665 public static class SearcherPool {
67 - IndexSearcherMul searchers[] = new IndexSearcherMul[OPEN_SEARCHERS];
 66+ IndexSearcherMul searchers[];
6867
69 - SearcherPool(IndexId iid, String path) throws IOException {
70 - for(int i=0;i<OPEN_SEARCHERS;i++){
 68+ SearcherPool(IndexId iid, String path, int poolsize) throws IOException {
 69+ searchers = new IndexSearcherMul[poolsize];
 70+ for(int i=0;i<poolsize;i++){
7171 searchers[i] = open(iid, path);
7272 }
7373 }
@@ -105,6 +105,8 @@
106106 /** searchable -> host */
107107 protected Hashtable<Searchable,String> searchableHost;
108108
 109+ public int searchPoolSize = 1;
 110+
109111 /** lazy initalization of search indexes (set of dbroles) */
110112 protected Set<String> initialized;
111113
@@ -112,11 +114,6 @@
113115 * update via NetworkStatusThread */
114116 protected Set<SearchHost> deadHosts;
115117
116 - /** Instances of searchables in use by some searcher */
117 - protected Hashtable<Searchable,SimpleInt> inUse;
118 - /** Searchables that should be closed after all searchers are done with them */
119 - protected HashSet<Searchable> closeQueue;
120 -
121118 static protected SearcherCache instance = null;
122119
123120 protected Object lock;
@@ -212,14 +209,22 @@
213210 try {
214211 if(iid.isLogical())
215212 continue;
216 - IndexSearcherMul is = getLocalSearcher(iid);
217 - Warmup.warmupIndexSearcher(is,iid,false);
 213+ for(IndexSearcherMul is : getSearcherPool(iid))
 214+ Warmup.warmupIndexSearcher(is,iid,false);
218215 } catch (IOException e) {
219216 log.warn("I/O error warming index for "+iid);
220217 }
221218 }
222219 }
223 -
 220+
 221+ /** Get all searchers for iid, open/create if doesn't exist */
 222+ private IndexSearcherMul[] getSearcherPool(IndexId iid) throws IOException {
 223+ SearcherPool pool = localCache.get(iid.toString());
 224+ if(pool == null)
 225+ addLocalSearcherToCache(iid);
 226+ return localCache.get(iid.toString()).searchers;
 227+ }
 228+
224229 /**
225230 * Make a searchable instance, and add it to cache
226231 * @return the created searchable instance
@@ -254,7 +259,11 @@
255260 synchronized(iid){
256261 // make sure some other thread has not opened the searcher
257262 if(localCache.get(iid.toString()) == null){
258 - SearcherPool pool = new SearcherPool(iid,iid.getCanonicalSearchPath());
 263+ if(!iid.isMySearch())
 264+ throw new IOException(iid+" is not searched by this host.");
 265+ if(iid.isLogical())
 266+ throw new IOException(iid+" will not open logical index.");
 267+ SearcherPool pool = new SearcherPool(iid,iid.getCanonicalSearchPath(),searchPoolSize);
259268 localCache.put(iid.toString(),pool);
260269 for(IndexSearcherMul s : pool.searchers)
261270 searchableHost.put(s,"");
@@ -392,9 +401,7 @@
393402 * @param searcher
394403 */
395404 public IndexSearcherMul[] invalidateLocalSearcher(IndexId iid, SearcherPool newpool) {
396 - IndexSearcherMul olds;
397405 log.debug("Invalidating local searcher for "+iid);
398 - ArrayList<SearchableMul> close = new ArrayList<SearchableMul>();
399406 synchronized(lock){
400407 SearcherPool oldpool = localCache.get(iid.toString());
401408 // put in the new value
@@ -405,67 +412,14 @@
406413 return newpool.searchers; // no old searcher
407414 for(IndexSearcherMul s : oldpool.searchers){
408415 searchableHost.remove(s);
409 - // close the old index searcher (imediatelly, or queue if in use)
410 - SimpleInt useCount = inUse.get(s);
411 - if(useCount == null || useCount.count == 0)
412 - close.add(s);
413 - else{
414 - log.debug("Searcher for "+iid+" will be closed after local searches are done.");
415 - }
416 -
417 - closeQueue.add(s);
 416+ // deferred close
 417+ log.debug("Deferred closure of searcher "+s);
 418+ new DeferredClose(s,15000).start();
418419 }
419420 }
420 - // close outside of sync block
421 - for(SearchableMul s : close)
422 - closeSearcher(s);
423 -
424421 return newpool.searchers;
425422 }
426 -
427 - /** Tell the cache that the searcher is in use */
428 - public void checkout(SearchableMul searcher) {
429 - synchronized(lock){
430 - SimpleInt i = inUse.get(searcher);
431 - if(i == null)
432 - inUse.put(searcher,new SimpleInt(1));
433 - else
434 - i.count++;
435 - }
436 - }
437 -
438 - /** Tell the cache that the searcher is no longer in use */
439 - public void release(SearchableMul searcher) {
440 - synchronized(lock){
441 - SimpleInt i = inUse.get(searcher);
442 - if(i == null)
443 - log.warn("Error in returnBack, returned Searcher that is not checked out. "+searcher);
444 - else
445 - i.count--;
446 - }
447 -
448 - closeSearcher(searcher);
449 - }
450423
451 - protected void closeSearcher(SearchableMul searcher){
452 - boolean close = false;
453 - synchronized(lock){
454 - SimpleInt used = inUse.get(searcher);
455 - if(used == null || used.count == 0){
456 - if(closeQueue.contains(searcher)){
457 - closeQueue.remove(searcher);
458 - searchableHost.remove(searcher);
459 - close = true;
460 - }
461 - }
462 - }
463 - // deferred close
464 - if(close){
465 - log.debug("Deferred closure of searcher "+searcher);
466 - new DeferredClose(searcher,15000).start();
467 - }
468 - }
469 -
470424 /** Get a copy of array of dead hosts */
471425 public HashSet<SearchHost> getDeadHosts(){
472426 synchronized(lock){
@@ -488,15 +442,18 @@
489443 localCache = new Hashtable<String,SearcherPool>();
490444 deadHosts = Collections.synchronizedSet(new HashSet<SearchHost>());
491445 global = GlobalConfiguration.getInstance();
492 - inUse = new Hashtable<Searchable,SimpleInt>();
493 - closeQueue = new HashSet<Searchable>();
494446 searchableHost = new Hashtable<Searchable,String>();
495447 remoteKeys = new Hashtable<String,Set<String>>();
496448 lock = new Object();
497449 initialized = Collections.synchronizedSet(new HashSet<String>());
 450+ searchPoolSize = Configuration.open().getInt("SearcherPool","size",1);
498451 }
499452
 453+ public int getSearchPoolSize() {
 454+ return searchPoolSize;
 455+ }
500456
 457+
501458
502459
503460 }
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/search/WikiSearcher.java
@@ -100,7 +100,6 @@
101101 if(searcher == null)
102102 throw new Exception("Error constructing searcher, check logs.");
103103
104 - cache.checkout(searcher);
105104 }
106105
107106 /** Got host for the iid within this multi searcher */
@@ -114,8 +113,6 @@
115114
116115 @Override
117116 public void close() throws IOException {
118 - cache.release(searcher);
119 -
120117 }
121118
122119 @Override
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/search/UpdateThread.java
@@ -208,7 +208,7 @@
209209 searchpath.mkdir();
210210
211211 // check if updated index is a valid one (throws an exception on error)
212 - SearcherCache.SearcherPool pool = new SearcherCache.SearcherPool(iid,li.path);
 212+ SearcherCache.SearcherPool pool = new SearcherCache.SearcherPool(iid,li.path,cache.getSearchPoolSize());
213213
214214 // refresh the symlink
215215 command = "/bin/rm -rf "+iid.getSearchPath();
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/search/SearchEngine.java
@@ -105,7 +105,7 @@
106106 NamespaceFilterWrapper localfilter = filter;
107107 if(iid.isMainsplit() && iid.isMainPart())
108108 localfilter = null;
109 - else if(iid.isNssplit() && !iid.isLogical() && iid.getNamespaceSet().size()==1)
 109+ else if(iid.isNssplit() && !iid.isLogical() && iid.getNamespaceSet().size()==1 && !iid.getNamespaceSet().contains("<default>"))
110110 localfilter = null;
111111 if(localfilter != null)
112112 log.info("Using local filter: "+localfilter);
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/index/IndexThread.java
@@ -321,7 +321,7 @@
322322 IndexReader reader = IndexReader.open(iid.getImportPath());
323323 if(!reader.isOptimized()){
324324 reader.close();
325 - log.debug("Optimizing "+iid);
 325+ log.info("Optimizing "+iid);
326326 long start = System.currentTimeMillis();
327327 Transaction trans = new Transaction(iid);
328328 trans.begin();
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/analyzers/WikiQueryParser.java
@@ -21,6 +21,7 @@
2222 import org.apache.lucene.search.spans.SpanNearQuery;
2323 import org.apache.lucene.search.spans.SpanQuery;
2424 import org.apache.lucene.search.spans.SpanTermQuery;
 25+import org.mediawiki.importer.ExactListFilter;
2526 import org.wikimedia.lsearch.config.GlobalConfiguration;
2627 import org.wikimedia.lsearch.index.WikiIndexModifier;
2728 import org.wikimedia.lsearch.search.NamespaceFilter;
@@ -78,7 +79,6 @@
7980 public static float TITLE_ALIAS_BOOST = 0.2f;
8081 public static float STEM_TITLE_BOOST = 2;
8182 public static float STEM_TITLE_ALIAS_BOOST = 0.4f;
82 - public static float REDIRECT_BOOST = 0.2f;
8383 public static float ALT_TITLE_BOOST = 2;
8484 public static float ALT_TITLE_ALIAS_BOOST = 0.4f;
8585 public static float KEYWORD_BOOST = 0.02f;
@@ -296,7 +296,7 @@
297297 continue; // ignore whitespaces
298298
299299 // pluses and minuses, underscores can be within words, *,? are for wildcard queries
300 - if(Character.isLetterOrDigit(ch) || ch=='-' || ch=='+' || ch=='_' || ch=='*' || ch=='?'){
 300+ if(Character.isLetterOrDigit(ch) || ch=='-' || ch=='+' || ch=='_' || ch=='*'){
301301 buffer[length++] = ch;
302302 } else{
303303 cur--; // position before the nonletter character
@@ -396,13 +396,13 @@
397397 /** Make a lucene term from string */
398398 private Term makeTerm(String t){
399399 if(field == null)
400 - return new Term(defaultField,t);
 400+ return new Term(defaultField,builder.isExactCase()? t : t.toLowerCase());
401401 else if(!field.equals("incategory") &&
402402 (namespacePolicy == NamespacePolicy.IGNORE ||
403403 namespacePolicy == NamespacePolicy.REWRITE))
404404 return new Term(defaultField,t);
405405 else if(field.equals("incategory"))
406 - return new Term("category",t);
 406+ return new Term("category",builder.isExactCase()? t : t.toLowerCase());
407407 else
408408 return new Term(field,t);
409409 }
@@ -664,7 +664,7 @@
665665
666666 // check for wildcard seaches, they are also not analyzed/stemmed, only for titles
667667 // wildcard signs are allowed only at the end of the word, minimum one letter word
668 - if(length>1 && Character.isLetter(buffer[0]) && (buffer[length-1]=='*' || buffer[length-1]=='?') &&
 668+ if(length>1 && Character.isLetter(buffer[0]) && buffer[length-1]=='*' &&
669669 defaultField.equals(fields.title())){
670670 Query ret = new WildcardQuery(makeTerm());
671671 ret.setBoost(defaultBoost);
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java
@@ -193,8 +193,10 @@
194194 int queueSize = 0;
195195 do{
196196 queueSize = messenger.getIndexerQueueSize(iid.getIndexHost());
197 - if(queueSize >= maxQueueSize)
 197+ if(queueSize >= maxQueueSize){
 198+ log.info("Remote queue is "+queueSize+", sleeping for 5s");
198199 Thread.sleep(5000); // sleep five seconds then retry
 200+ }
199201 } while(queueSize >= maxQueueSize);
200202
201203 log.info(iid+": Sending "+records.size()+" records to indexer");
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/util/RecentUpdatesDaemon.java
@@ -1,361 +0,0 @@
2 -package org.wikimedia.lsearch.util;
3 -
4 -import java.io.BufferedReader;
5 -import java.io.IOException;
6 -import java.io.InputStreamReader;
7 -import java.io.PrintWriter;
8 -import java.net.DatagramPacket;
9 -import java.net.DatagramSocket;
10 -import java.net.InetAddress;
11 -import java.net.ServerSocket;
12 -import java.net.Socket;
13 -import java.net.SocketException;
14 -import java.util.HashSet;
15 -import java.util.Hashtable;
16 -import java.util.StringTokenizer;
17 -import java.util.Map.Entry;
18 -import java.util.concurrent.ExecutorService;
19 -import java.util.concurrent.Executors;
20 -
21 -import org.apache.log4j.Logger;
22 -import org.wikimedia.lsearch.config.Configuration;
23 -
24 -/**
25 - * UDP Server for MWSearch extension. Receives update notification
26 - * from MWSearchUpdateHook.php, and offers TCP interface
27 - * (for luceneUpdate.php) to retrieve all articles updates since
28 - * last call to the daemon (FETCH NEW), or if there was a problem
29 - * with last round of update the previous updates (FETCH OLD).
30 - * <p>
31 - * Daemon can be paired with another one (each one is hotspare for
32 - * the other). Using UDP signalling they try to manage aproximatelly
33 - * the same queue status (maintaing exactly same status would require
34 - * much larger overhead). Hook should send update notifications to both.
35 - * Indexer should always use a single daemon, and revert to the other
36 - * only if the first is down.
37 - *
38 - * <p>
39 - * Each line in UDP packet is of syntax:<br>
40 - * dbname [UPDATE|DELETE] namespace:title<br>
41 - *
42 - * or for maintaining hotspares: dbname FETCHED
43 - *
44 - * <p>
45 - * TCP query is of syntax:<br>
46 - * [FETCH|RESTORE] [NEW|OLD] [ON dbname]<br>
47 - *
48 - * <p>
49 - * TCP response:<br>
50 - * dbname [UPDATE|DELETE] namespace:title<br>
51 - * ...
52 - *
53 - * @author rainman
54 - *
55 - */
56 -public class RecentUpdatesDaemon {
57 -
58 - /** dbname -> ns:title -> operation */
59 - protected Hashtable<String,Hashtable<String,String>> queue = new Hashtable<String,Hashtable<String,String>>();
60 - protected Hashtable<String,Hashtable<String,String>> oldqueue = new Hashtable<String,Hashtable<String,String>>();
61 -
62 - /** use this lock when modifying queue */
63 - protected Object lock = new Object();
64 -
65 - /** The UDP daemon that recieves update notifications from MediaWiki hook */
66 - class UDPServer extends Thread {
67 - org.apache.log4j.Logger log = Logger.getLogger(UDPServer.class);
68 - protected DatagramSocket socket = null;
69 -
70 - public UDPServer(){
71 - Configuration conf = Configuration.open();
72 - int udpPort = conf.getInt("RecentUpdateDaemon","udp",8111);
73 - try {
74 - socket = new DatagramSocket(udpPort);
75 - log.info("UDP server up at port "+udpPort);
76 - } catch (SocketException e) {
77 - log.fatal("Cannot make UDP server at port "+udpPort+" : "+e.getMessage());
78 - }
79 - }
80 -
81 - @Override
82 - public void run() {
83 - byte[] buf = new byte[1500];
84 -
85 - for(;;){
86 - // receive request
87 - DatagramPacket packet = new DatagramPacket(buf, buf.length);
88 - try {
89 - socket.receive(packet);
90 - // handle request
91 - String mesg = new String(packet.getData(),packet.getOffset(),packet.getLength(),"UTF-8");
92 - StringTokenizer st = new StringTokenizer(mesg,"\n\r");
93 - while(st.hasMoreTokens()){
94 - String s = st.nextToken();
95 - if(s.trim().equals(""))
96 - continue;
97 - String[] parts = s.split(" +",3);
98 - // check if it's request form other (hotspare) daemon
99 - if(parts.length == 2 && parts[1].equals("FETCHED")){
100 - if(queue.get(parts[0])!=null)
101 - oldqueue.put(parts[0],queue.remove(parts[0]));
102 - else
103 - oldqueue.remove(parts[0]);
104 - log.debug("Update for "+parts[0]+" fetched on other daemon");
105 - continue;
106 - // syntax check
107 - } else if(parts.length != 3){
108 - log.warn("Recieved bad syntax: "+s);
109 - continue;
110 - }
111 - String dbname = parts[0];
112 - String oper = parts[1];
113 - String title = parts[2];
114 - if(!oper.equals("UPDATE") && !oper.equals("DELETE")){
115 - log.warn("Unrecognized operation (should be UPDATE or DELETE): "+parts[2]);
116 - continue;
117 - }
118 - log.debug("Processing "+dbname+" "+oper+" "+title);
119 - // update queue
120 - synchronized(lock){
121 - Hashtable<String,String> titles = queue.get(dbname);
122 - if(titles == null){
123 - titles = new Hashtable<String,String>();
124 - queue.put(dbname,titles);
125 - }
126 - titles.put(title,oper);
127 - }
128 - }
129 - } catch (IOException e) {
130 - log.warn("I/O error receiving UDP packet: "+e.getMessage());
131 - }
132 - }
133 - }
134 -
135 -
136 - }
137 -
138 - /** TCP worker thread, handles requests */
139 - class TCPDaemon extends Thread {
140 - org.apache.log4j.Logger log = Logger.getLogger(TCPDaemon.class);
141 - protected BufferedReader in;
142 - protected PrintWriter out;
143 -
144 - public TCPDaemon(Socket sock) {
145 - try {
146 - in = new BufferedReader(new InputStreamReader(sock.getInputStream()));
147 - out = new PrintWriter(sock.getOutputStream(),true);
148 - } catch (IOException e) {
149 - log.warn("Error openning input/output streams");
150 - }
151 - }
152 -
153 - @Override
154 - public void run() {
155 -
156 - try {
157 - handle();
158 - } catch (Exception e) {
159 - log.warn("Error processing request: "+e.getMessage());
160 - } finally{
161 - try { out.close(); } catch(Exception e) { }
162 - try { in.close(); } catch(Exception e) { }
163 - }
164 - }
165 -
166 - /** Single TCP request handler */
167 - protected void handle() throws IOException {
168 - String line = in.readLine();
169 - boolean fetchnew = false, notify = false;
170 - boolean restorenew = false;
171 -
172 - log.debug("Got request "+line);
173 -
174 - String db = null;
175 - if(line.contains("ON")){
176 - String[] p = line.split(" ");
177 - db = p[p.length-1];
178 - }
179 -
180 - if(line.startsWith("FETCH NEW")){
181 - fetchnew = true;
182 - notify = true;
183 - } else if(line.startsWith("RESTORE NEW"))
184 - restorenew = true;
185 - else if(line.startsWith("RESTORE OLD"))
186 - fetchnew = false;
187 - else if(line.startsWith("FETCH OLD"))
188 - fetchnew = false;
189 - else{
190 - log.warn("Invalid request: "+line);
191 - return;
192 - }
193 - HashSet<String> changedDBs = new HashSet<String>();
194 - if(fetchnew){
195 - synchronized(lock){
196 - if(db == null){
197 - changedDBs.addAll(oldqueue.keySet());
198 - changedDBs.addAll(queue.keySet());
199 - oldqueue = queue;
200 - queue = new Hashtable<String,Hashtable<String,String>>();
201 - } else if(queue.get(db)!=null){
202 - changedDBs.add(db);
203 - oldqueue.put(db,queue.remove(db));
204 - } else
205 - oldqueue.remove(db);
206 - }
207 - }
208 -
209 - // notify the backup daemon
210 - if(notify){
211 - for(String dbname : changedDBs)
212 - sendHotspareNotification(dbname+" FETCHED\n");
213 - }
214 -
215 - // respond
216 - if(restorenew){
217 - // need to clone queue to make its iterator thread-safe
218 - Hashtable<String,Hashtable<String,String>> q = (Hashtable<String, Hashtable<String, String>>) queue.clone();
219 - for(Entry<String,Hashtable<String,String>> et : q.entrySet()){
220 - write(et.getKey(),(Hashtable<String, String>)et.getValue().clone());
221 - }
222 - } else if(db!=null && oldqueue.get(db)!=null){
223 - write(db,oldqueue.get(db));
224 - } else if(db==null){
225 - for(Entry<String,Hashtable<String,String>> et : oldqueue.entrySet()){
226 - write(et.getKey(),et.getValue());
227 - }
228 - }
229 - }
230 -
231 - /** Write out one db hashtable as: dbname operation ns:title */
232 - protected void write(String db, Hashtable<String,String> titles){
233 - if(titles == null)
234 - return;
235 - for(Entry<String,String> to : titles.entrySet()){
236 - String line = db+" "+to.getValue()+" "+to.getKey();
237 - log.debug("<<< "+line);
238 - out.println(line);
239 - }
240 - }
241 -
242 - /** Send UDP packet to hotspare RecentUpdatesDaemon keeping it in sync */
243 - public void sendHotspareNotification(String data){
244 - if(hotspareHost==null || hotspareUdpPort==0)
245 - return; // no hotspare
246 - try{
247 - if(udpSocket == null)
248 - udpSocket = new DatagramSocket();
249 - byte[] buf = data.getBytes();
250 - InetAddress address = InetAddress.getByName(hotspareHost);
251 - DatagramPacket packet = new DatagramPacket(buf, buf.length, address, hotspareUdpPort);
252 - udpSocket.send(packet);
253 - } catch(Exception e){
254 - log.warn("Error sending datagram to hotspare: "+e.getMessage());
255 - }
256 -
257 - }
258 -
259 - }
260 -
261 - /** Recieves TCP connections */
262 - class TCPServer extends Thread {
263 - org.apache.log4j.Logger log = Logger.getLogger(TCPServer.class);
264 - @Override
265 - public void run() {
266 - int maxThreads = 10;
267 - ServerSocket sock;
268 -
269 - Configuration config = Configuration.open();
270 - int port = config.getInt("RecentUpdateDaemon","tcp",8112);
271 - try {
272 - sock = new ServerSocket(port);
273 - } catch (Exception e) {
274 - log.fatal("Cannot make TCP server at port "+port+" : " + e.getMessage());
275 - return;
276 - }
277 - ExecutorService pool = Executors.newFixedThreadPool(maxThreads);
278 - log.info("TCP server up at port "+port);
279 -
280 - for (;;) {
281 - Socket client;
282 - try {
283 - client = sock.accept();
284 - } catch (Exception e) {
285 - log.error("accept() error: " + e.getMessage());
286 - continue;
287 - }
288 -
289 - TCPDaemon worker = new TCPDaemon(client);
290 - pool.execute(worker);
291 - }
292 -
293 - }
294 -
295 - }
296 - protected DatagramSocket udpSocket=null;
297 - protected String hotspareHost=null;
298 - protected int hotspareUdpPort, hotspareTcpPort;
299 -
300 - /** Fetch queue from hotspare RecentUpdatesDaemon using command */
301 - public Hashtable<String,Hashtable<String,String>> fetchQueue(String command){
302 - Hashtable<String,Hashtable<String,String>> res = new Hashtable<String,Hashtable<String,String>>();
303 -
304 - Socket socket = null;
305 - PrintWriter out = null;
306 - BufferedReader in = null;
307 -
308 - try {
309 - socket = new Socket(hotspareHost, hotspareTcpPort);
310 - out = new PrintWriter(socket.getOutputStream(), true);
311 - in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
312 -
313 - out.println(command);
314 -
315 - String line;
316 - int count=0;
317 - while((line = in.readLine()) != null){
318 - String[] parts = line.split(" ",3);
319 - String dbname = parts[0];
320 - String oper = parts[1];
321 - String title = parts[2];
322 -
323 - Hashtable<String,String> titles = res.get(dbname);
324 - if(titles == null){
325 - titles = new Hashtable<String,String>();
326 - res.put(dbname,titles);
327 - }
328 - titles.put(title,oper);
329 - count++;
330 - }
331 - System.out.println("Retrieved queue of size "+count+" from hotspare daemon");
332 - out.close();
333 - in.close();
334 - socket.close();
335 - } catch (IOException e) {
336 - System.out.println("Warning: Could not get queue from the hotspare daemon at "+hotspareHost+": "+e.getMessage());
337 - }
338 - return res;
339 - }
340 -
341 - public RecentUpdatesDaemon(){
342 - Configuration config = Configuration.open(); // init log4j
343 - hotspareHost = config.getString("RecentUpdateDaemon","hostspareHost");
344 - hotspareUdpPort = config.getInt("RecentUpdateDaemon","hostspareUdpPort",8111);
345 - hotspareTcpPort = config.getInt("RecentUpdateDaemon","hostspareTcpPort",8112);
346 -
347 - // try to restore queues from the hotspare
348 - if(hotspareHost != null){
349 - queue = fetchQueue("RESTORE NEW");
350 - oldqueue = fetchQueue("RESTORE OLD");
351 - }
352 -
353 - new UDPServer().start();
354 - new TCPServer().start();
355 - }
356 -
357 - public static void main(String args[]){
358 - new RecentUpdatesDaemon();
359 - }
360 -
361 -
362 -}
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/util/Localization.java
@@ -224,7 +224,7 @@
225225 return new Title(canonicalNamespaces.get(ns),parts[1]);
226226 // check lang namespaces
227227 Hashtable<String,Integer> map = namespaces.get(lang);
228 - if(map.containsKey(ns))
 228+ if(map!=null && map.containsKey(ns))
229229 return new Title(map.get(ns),parts[1]);
230230 }
231231 // not recognized namespace, using main
Index: trunk/lucene-search-2.0/lsearch.conf
@@ -82,6 +82,10 @@
8383 # Log, ganglia, localization
8484 ################################################
8585
 86+# If this host runs on multiple CPUs maintain a pool of index searchers
 87+# It's good idea to make it number of CPUs+1, or some larger odd number
 88+SearcherPool.size=3
 89+
8690 # URL to MediaWiki message files
8791 Localization.url=file:///var/www/html/wiki-lucene/phase3/languages/messages
8892
@@ -92,14 +96,6 @@
9397 # Max queue size on remote indexer after which we wait a bit
9498 OAI.maxqueue=5000
9599
96 -# RecentUpdateDaemon udp and tcp ports
97 -RecentUpdateDaemon.udp=8111
98 -RecentUpdateDaemon.tcp=8112
99 -# Hot spare
100 -RecentUpdateDaemon.hostspareHost=vega
101 -RecentUpdateDaemon.hostspareUdpPort=8111
102 -RecentUpdateDaemon.hostspareTcpPort=8112
103 -
104100 # Log configuration
105101 Logging.logconfig=/etc/mwsearch.log4j
106102

Status & tagging log