Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/test/WikiQueryParserTest.java |
— | — | @@ -34,7 +34,6 @@ |
35 | 35 | Configuration.setConfigFile(System.getProperty("user.dir")+"/test-data/mwsearch.conf.test"); |
36 | 36 | Configuration.open(); |
37 | 37 | WikiQueryParser.TITLE_BOOST = 2; |
38 | | - WikiQueryParser.REDIRECT_BOOST = 0.2f; |
39 | 38 | WikiQueryParser.ALT_TITLE_BOOST = 6; |
40 | 39 | WikiQueryParser.KEYWORD_BOOST = 0.05f; |
41 | 40 | WikiIndexModifier.ALT_TITLES = 3; |
— | — | @@ -173,7 +172,7 @@ |
174 | 173 | assertEquals("+(+(contents:beans contents:bean^0.5) +category:food) +(+contents:orchid +category:\"some flowers\")",q.toString()); |
175 | 174 | |
176 | 175 | q = parser.parseRaw("(Beans AND incategory:FOod) (orchID AND incategory:\"some FLOWERS\")"); |
177 | | - assertEquals("+(+(contents:beans contents:bean^0.5) +category:FOod) +(+contents:orchid +category:\"some FLOWERS\")",q.toString()); |
| 176 | + assertEquals("+(+(contents:beans contents:bean^0.5) +category:food) +(+contents:orchid +category:\"some flowers\")",q.toString()); |
178 | 177 | |
179 | 178 | q = parser.parse("(beans AND incategory:food) (orchid AND incategory:\"some flowers\")"); |
180 | 179 | assertEquals("+(+(+(contents:beans contents:bean^0.5) title:beans^2.0) +category:food) +(+(+contents:orchid +category:\"some flowers\") title:orchid^2.0)",q.toString()); |
— | — | @@ -341,6 +340,12 @@ |
342 | 341 | q = parser.parseTwoPass("all_talk: beans everyone",NamespacePolicy.REWRITE); |
343 | 342 | assertEquals("(+(namespace:1 namespace:3 namespace:5 namespace:7 namespace:9 namespace:11 namespace:13 namespace:15) +(+(contents:beans contents:bean^0.5) +(contents:everyone contents:everyon^0.5))) (+(namespace:1 namespace:3 namespace:5 namespace:7 namespace:9 namespace:11 namespace:13 namespace:15) +(+title:beans^2.0 +title:everyone^2.0))",q.toString()); |
344 | 343 | |
| 344 | + // German |
| 345 | + analyzer = Analyzers.getSearcherAnalyzer("de"); |
| 346 | + bs = new FieldBuilder("de").getBuilder(); |
| 347 | + parser = new WikiQueryParser(bs.getFields().contents(),"0",analyzer,bs,NamespacePolicy.IGNORE); |
| 348 | + q = parser.parseTwoPass("welche rolle spielen Mineralstoffe in der Ernährung?",NamespacePolicy.IGNORE); |
| 349 | + assertEquals("(+(contents:welche contents:welch^0.5) +(contents:rolle contents:roll^0.5) +(contents:spielen contents:spiel^0.5) +(contents:mineralstoffe contents:mineralstoff^0.5) +contents:in +contents:der +(+(contents:ernahrung contents:ernahr^0.5) (contents:ernaehrung contents:ernaehr^0.5))) (+title:welche^2.0 +title:rolle^2.0 +title:spielen^2.0 +title:mineralstoffe^2.0 +title:in^2.0 +title:der^2.0 +(title:ernahrung^2.0 title:ernaehrung^2.0))",q.toString()); |
345 | 350 | |
346 | 351 | // Test field extraction |
347 | 352 | HashSet<NamespaceFilter> fs = parser.getFieldNamespaces("main:something [1]:else all:oh []:nja"); |
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/search/SearcherCache.java |
— | — | @@ -19,6 +19,7 @@ |
20 | 20 | import org.apache.lucene.search.Searchable; |
21 | 21 | import org.apache.lucene.search.SearchableMul; |
22 | 22 | import org.wikimedia.lsearch.beans.SearchHost; |
| 23 | +import org.wikimedia.lsearch.config.Configuration; |
23 | 24 | import org.wikimedia.lsearch.config.GlobalConfiguration; |
24 | 25 | import org.wikimedia.lsearch.config.IndexId; |
25 | 26 | import org.wikimedia.lsearch.config.IndexRegistry; |
— | — | @@ -57,16 +58,15 @@ |
58 | 59 | log.warn("I/O error closing searchables "+s); |
59 | 60 | } |
60 | 61 | } |
61 | | - } |
| 62 | + } |
62 | 63 | |
63 | | - public static final int OPEN_SEARCHERS = 4; |
64 | | - |
65 | 64 | /** Holds OPEN_SEARCHERS num of index searchers, for multiprocessor workstations */ |
66 | 65 | public static class SearcherPool { |
67 | | - IndexSearcherMul searchers[] = new IndexSearcherMul[OPEN_SEARCHERS]; |
| 66 | + IndexSearcherMul searchers[]; |
68 | 67 | |
69 | | - SearcherPool(IndexId iid, String path) throws IOException { |
70 | | - for(int i=0;i<OPEN_SEARCHERS;i++){ |
| 68 | + SearcherPool(IndexId iid, String path, int poolsize) throws IOException { |
| 69 | + searchers = new IndexSearcherMul[poolsize]; |
| 70 | + for(int i=0;i<poolsize;i++){ |
71 | 71 | searchers[i] = open(iid, path); |
72 | 72 | } |
73 | 73 | } |
— | — | @@ -105,6 +105,8 @@ |
106 | 106 | /** searchable -> host */ |
107 | 107 | protected Hashtable<Searchable,String> searchableHost; |
108 | 108 | |
| 109 | + public int searchPoolSize = 1; |
| 110 | + |
109 | 111 | /** lazy initalization of search indexes (set of dbroles) */ |
110 | 112 | protected Set<String> initialized; |
111 | 113 | |
— | — | @@ -112,11 +114,6 @@ |
113 | 115 | * update via NetworkStatusThread */ |
114 | 116 | protected Set<SearchHost> deadHosts; |
115 | 117 | |
116 | | - /** Instances of searchables in use by some searcher */ |
117 | | - protected Hashtable<Searchable,SimpleInt> inUse; |
118 | | - /** Searchables that should be closed after all searchers are done with them */ |
119 | | - protected HashSet<Searchable> closeQueue; |
120 | | - |
121 | 118 | static protected SearcherCache instance = null; |
122 | 119 | |
123 | 120 | protected Object lock; |
— | — | @@ -212,14 +209,22 @@ |
213 | 210 | try { |
214 | 211 | if(iid.isLogical()) |
215 | 212 | continue; |
216 | | - IndexSearcherMul is = getLocalSearcher(iid); |
217 | | - Warmup.warmupIndexSearcher(is,iid,false); |
| 213 | + for(IndexSearcherMul is : getSearcherPool(iid)) |
| 214 | + Warmup.warmupIndexSearcher(is,iid,false); |
218 | 215 | } catch (IOException e) { |
219 | 216 | log.warn("I/O error warming index for "+iid); |
220 | 217 | } |
221 | 218 | } |
222 | 219 | } |
223 | | - |
| 220 | + |
| 221 | + /** Get all searchers for iid, open/create if doesn't exist */ |
| 222 | + private IndexSearcherMul[] getSearcherPool(IndexId iid) throws IOException { |
| 223 | + SearcherPool pool = localCache.get(iid.toString()); |
| 224 | + if(pool == null) |
| 225 | + addLocalSearcherToCache(iid); |
| 226 | + return localCache.get(iid.toString()).searchers; |
| 227 | + } |
| 228 | + |
224 | 229 | /** |
225 | 230 | * Make a searchable instance, and add it to cache |
226 | 231 | * @return the created searchable instance |
— | — | @@ -254,7 +259,11 @@ |
255 | 260 | synchronized(iid){ |
256 | 261 | // make sure some other thread has not opened the searcher |
257 | 262 | if(localCache.get(iid.toString()) == null){ |
258 | | - SearcherPool pool = new SearcherPool(iid,iid.getCanonicalSearchPath()); |
| 263 | + if(!iid.isMySearch()) |
| 264 | + throw new IOException(iid+" is not searched by this host."); |
| 265 | + if(iid.isLogical()) |
| 266 | + throw new IOException(iid+" will not open logical index."); |
| 267 | + SearcherPool pool = new SearcherPool(iid,iid.getCanonicalSearchPath(),searchPoolSize); |
259 | 268 | localCache.put(iid.toString(),pool); |
260 | 269 | for(IndexSearcherMul s : pool.searchers) |
261 | 270 | searchableHost.put(s,""); |
— | — | @@ -392,9 +401,7 @@ |
393 | 402 | * @param searcher |
394 | 403 | */ |
395 | 404 | public IndexSearcherMul[] invalidateLocalSearcher(IndexId iid, SearcherPool newpool) { |
396 | | - IndexSearcherMul olds; |
397 | 405 | log.debug("Invalidating local searcher for "+iid); |
398 | | - ArrayList<SearchableMul> close = new ArrayList<SearchableMul>(); |
399 | 406 | synchronized(lock){ |
400 | 407 | SearcherPool oldpool = localCache.get(iid.toString()); |
401 | 408 | // put in the new value |
— | — | @@ -405,67 +412,14 @@ |
406 | 413 | return newpool.searchers; // no old searcher |
407 | 414 | for(IndexSearcherMul s : oldpool.searchers){ |
408 | 415 | searchableHost.remove(s); |
409 | | - // close the old index searcher (imediatelly, or queue if in use) |
410 | | - SimpleInt useCount = inUse.get(s); |
411 | | - if(useCount == null || useCount.count == 0) |
412 | | - close.add(s); |
413 | | - else{ |
414 | | - log.debug("Searcher for "+iid+" will be closed after local searches are done."); |
415 | | - } |
416 | | - |
417 | | - closeQueue.add(s); |
| 416 | + // deferred close |
| 417 | + log.debug("Deferred closure of searcher "+s); |
| 418 | + new DeferredClose(s,15000).start(); |
418 | 419 | } |
419 | 420 | } |
420 | | - // close outside of sync block |
421 | | - for(SearchableMul s : close) |
422 | | - closeSearcher(s); |
423 | | - |
424 | 421 | return newpool.searchers; |
425 | 422 | } |
426 | | - |
427 | | - /** Tell the cache that the searcher is in use */ |
428 | | - public void checkout(SearchableMul searcher) { |
429 | | - synchronized(lock){ |
430 | | - SimpleInt i = inUse.get(searcher); |
431 | | - if(i == null) |
432 | | - inUse.put(searcher,new SimpleInt(1)); |
433 | | - else |
434 | | - i.count++; |
435 | | - } |
436 | | - } |
437 | | - |
438 | | - /** Tell the cache that the searcher is no longer in use */ |
439 | | - public void release(SearchableMul searcher) { |
440 | | - synchronized(lock){ |
441 | | - SimpleInt i = inUse.get(searcher); |
442 | | - if(i == null) |
443 | | - log.warn("Error in returnBack, returned Searcher that is not checked out. "+searcher); |
444 | | - else |
445 | | - i.count--; |
446 | | - } |
447 | | - |
448 | | - closeSearcher(searcher); |
449 | | - } |
450 | 423 | |
451 | | - protected void closeSearcher(SearchableMul searcher){ |
452 | | - boolean close = false; |
453 | | - synchronized(lock){ |
454 | | - SimpleInt used = inUse.get(searcher); |
455 | | - if(used == null || used.count == 0){ |
456 | | - if(closeQueue.contains(searcher)){ |
457 | | - closeQueue.remove(searcher); |
458 | | - searchableHost.remove(searcher); |
459 | | - close = true; |
460 | | - } |
461 | | - } |
462 | | - } |
463 | | - // deferred close |
464 | | - if(close){ |
465 | | - log.debug("Deferred closure of searcher "+searcher); |
466 | | - new DeferredClose(searcher,15000).start(); |
467 | | - } |
468 | | - } |
469 | | - |
470 | 424 | /** Get a copy of array of dead hosts */ |
471 | 425 | public HashSet<SearchHost> getDeadHosts(){ |
472 | 426 | synchronized(lock){ |
— | — | @@ -488,15 +442,18 @@ |
489 | 443 | localCache = new Hashtable<String,SearcherPool>(); |
490 | 444 | deadHosts = Collections.synchronizedSet(new HashSet<SearchHost>()); |
491 | 445 | global = GlobalConfiguration.getInstance(); |
492 | | - inUse = new Hashtable<Searchable,SimpleInt>(); |
493 | | - closeQueue = new HashSet<Searchable>(); |
494 | 446 | searchableHost = new Hashtable<Searchable,String>(); |
495 | 447 | remoteKeys = new Hashtable<String,Set<String>>(); |
496 | 448 | lock = new Object(); |
497 | 449 | initialized = Collections.synchronizedSet(new HashSet<String>()); |
| 450 | + searchPoolSize = Configuration.open().getInt("SearcherPool","size",1); |
498 | 451 | } |
499 | 452 | |
| 453 | + public int getSearchPoolSize() { |
| 454 | + return searchPoolSize; |
| 455 | + } |
500 | 456 | |
| 457 | + |
501 | 458 | |
502 | 459 | |
503 | 460 | } |
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/search/WikiSearcher.java |
— | — | @@ -100,7 +100,6 @@ |
101 | 101 | if(searcher == null) |
102 | 102 | throw new Exception("Error constructing searcher, check logs."); |
103 | 103 | |
104 | | - cache.checkout(searcher); |
105 | 104 | } |
106 | 105 | |
107 | 106 | /** Got host for the iid within this multi searcher */ |
— | — | @@ -114,8 +113,6 @@ |
115 | 114 | |
116 | 115 | @Override |
117 | 116 | public void close() throws IOException { |
118 | | - cache.release(searcher); |
119 | | - |
120 | 117 | } |
121 | 118 | |
122 | 119 | @Override |
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/search/UpdateThread.java |
— | — | @@ -208,7 +208,7 @@ |
209 | 209 | searchpath.mkdir(); |
210 | 210 | |
211 | 211 | // check if updated index is a valid one (throws an exception on error) |
212 | | - SearcherCache.SearcherPool pool = new SearcherCache.SearcherPool(iid,li.path); |
| 212 | + SearcherCache.SearcherPool pool = new SearcherCache.SearcherPool(iid,li.path,cache.getSearchPoolSize()); |
213 | 213 | |
214 | 214 | // refresh the symlink |
215 | 215 | command = "/bin/rm -rf "+iid.getSearchPath(); |
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/search/SearchEngine.java |
— | — | @@ -105,7 +105,7 @@ |
106 | 106 | NamespaceFilterWrapper localfilter = filter; |
107 | 107 | if(iid.isMainsplit() && iid.isMainPart()) |
108 | 108 | localfilter = null; |
109 | | - else if(iid.isNssplit() && !iid.isLogical() && iid.getNamespaceSet().size()==1) |
| 109 | + else if(iid.isNssplit() && !iid.isLogical() && iid.getNamespaceSet().size()==1 && !iid.getNamespaceSet().contains("<default>")) |
110 | 110 | localfilter = null; |
111 | 111 | if(localfilter != null) |
112 | 112 | log.info("Using local filter: "+localfilter); |
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/index/IndexThread.java |
— | — | @@ -321,7 +321,7 @@ |
322 | 322 | IndexReader reader = IndexReader.open(iid.getImportPath()); |
323 | 323 | if(!reader.isOptimized()){ |
324 | 324 | reader.close(); |
325 | | - log.debug("Optimizing "+iid); |
| 325 | + log.info("Optimizing "+iid); |
326 | 326 | long start = System.currentTimeMillis(); |
327 | 327 | Transaction trans = new Transaction(iid); |
328 | 328 | trans.begin(); |
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/analyzers/WikiQueryParser.java |
— | — | @@ -21,6 +21,7 @@ |
22 | 22 | import org.apache.lucene.search.spans.SpanNearQuery; |
23 | 23 | import org.apache.lucene.search.spans.SpanQuery; |
24 | 24 | import org.apache.lucene.search.spans.SpanTermQuery; |
| 25 | +import org.mediawiki.importer.ExactListFilter; |
25 | 26 | import org.wikimedia.lsearch.config.GlobalConfiguration; |
26 | 27 | import org.wikimedia.lsearch.index.WikiIndexModifier; |
27 | 28 | import org.wikimedia.lsearch.search.NamespaceFilter; |
— | — | @@ -78,7 +79,6 @@ |
79 | 80 | public static float TITLE_ALIAS_BOOST = 0.2f; |
80 | 81 | public static float STEM_TITLE_BOOST = 2; |
81 | 82 | public static float STEM_TITLE_ALIAS_BOOST = 0.4f; |
82 | | - public static float REDIRECT_BOOST = 0.2f; |
83 | 83 | public static float ALT_TITLE_BOOST = 2; |
84 | 84 | public static float ALT_TITLE_ALIAS_BOOST = 0.4f; |
85 | 85 | public static float KEYWORD_BOOST = 0.02f; |
— | — | @@ -296,7 +296,7 @@ |
297 | 297 | continue; // ignore whitespaces |
298 | 298 | |
299 | 299 | // pluses and minuses, underscores can be within words, *,? are for wildcard queries |
300 | | - if(Character.isLetterOrDigit(ch) || ch=='-' || ch=='+' || ch=='_' || ch=='*' || ch=='?'){ |
| 300 | + if(Character.isLetterOrDigit(ch) || ch=='-' || ch=='+' || ch=='_' || ch=='*'){ |
301 | 301 | buffer[length++] = ch; |
302 | 302 | } else{ |
303 | 303 | cur--; // position before the nonletter character |
— | — | @@ -396,13 +396,13 @@ |
397 | 397 | /** Make a lucene term from string */ |
398 | 398 | private Term makeTerm(String t){ |
399 | 399 | if(field == null) |
400 | | - return new Term(defaultField,t); |
| 400 | + return new Term(defaultField,builder.isExactCase()? t : t.toLowerCase()); |
401 | 401 | else if(!field.equals("incategory") && |
402 | 402 | (namespacePolicy == NamespacePolicy.IGNORE || |
403 | 403 | namespacePolicy == NamespacePolicy.REWRITE)) |
404 | 404 | return new Term(defaultField,t); |
405 | 405 | else if(field.equals("incategory")) |
406 | | - return new Term("category",t); |
| 406 | + return new Term("category",builder.isExactCase()? t : t.toLowerCase()); |
407 | 407 | else |
408 | 408 | return new Term(field,t); |
409 | 409 | } |
— | — | @@ -664,7 +664,7 @@ |
665 | 665 | |
666 | 666 | // check for wildcard seaches, they are also not analyzed/stemmed, only for titles |
667 | 667 | // wildcard signs are allowed only at the end of the word, minimum one letter word |
668 | | - if(length>1 && Character.isLetter(buffer[0]) && (buffer[length-1]=='*' || buffer[length-1]=='?') && |
| 668 | + if(length>1 && Character.isLetter(buffer[0]) && buffer[length-1]=='*' && |
669 | 669 | defaultField.equals(fields.title())){ |
670 | 670 | Query ret = new WildcardQuery(makeTerm()); |
671 | 671 | ret.setBoost(defaultBoost); |
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/oai/IncrementalUpdater.java |
— | — | @@ -193,8 +193,10 @@ |
194 | 194 | int queueSize = 0; |
195 | 195 | do{ |
196 | 196 | queueSize = messenger.getIndexerQueueSize(iid.getIndexHost()); |
197 | | - if(queueSize >= maxQueueSize) |
| 197 | + if(queueSize >= maxQueueSize){ |
| 198 | + log.info("Remote queue is "+queueSize+", sleeping for 5s"); |
198 | 199 | Thread.sleep(5000); // sleep five seconds then retry |
| 200 | + } |
199 | 201 | } while(queueSize >= maxQueueSize); |
200 | 202 | |
201 | 203 | log.info(iid+": Sending "+records.size()+" records to indexer"); |
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/util/RecentUpdatesDaemon.java |
— | — | @@ -1,361 +0,0 @@ |
2 | | -package org.wikimedia.lsearch.util; |
3 | | - |
4 | | -import java.io.BufferedReader; |
5 | | -import java.io.IOException; |
6 | | -import java.io.InputStreamReader; |
7 | | -import java.io.PrintWriter; |
8 | | -import java.net.DatagramPacket; |
9 | | -import java.net.DatagramSocket; |
10 | | -import java.net.InetAddress; |
11 | | -import java.net.ServerSocket; |
12 | | -import java.net.Socket; |
13 | | -import java.net.SocketException; |
14 | | -import java.util.HashSet; |
15 | | -import java.util.Hashtable; |
16 | | -import java.util.StringTokenizer; |
17 | | -import java.util.Map.Entry; |
18 | | -import java.util.concurrent.ExecutorService; |
19 | | -import java.util.concurrent.Executors; |
20 | | - |
21 | | -import org.apache.log4j.Logger; |
22 | | -import org.wikimedia.lsearch.config.Configuration; |
23 | | - |
24 | | -/** |
25 | | - * UDP Server for MWSearch extension. Receives update notification |
26 | | - * from MWSearchUpdateHook.php, and offers TCP interface |
27 | | - * (for luceneUpdate.php) to retrieve all articles updates since |
28 | | - * last call to the daemon (FETCH NEW), or if there was a problem |
29 | | - * with last round of update the previous updates (FETCH OLD). |
30 | | - * <p> |
31 | | - * Daemon can be paired with another one (each one is hotspare for |
32 | | - * the other). Using UDP signalling they try to manage aproximatelly |
33 | | - * the same queue status (maintaing exactly same status would require |
34 | | - * much larger overhead). Hook should send update notifications to both. |
35 | | - * Indexer should always use a single daemon, and revert to the other |
36 | | - * only if the first is down. |
37 | | - * |
38 | | - * <p> |
39 | | - * Each line in UDP packet is of syntax:<br> |
40 | | - * dbname [UPDATE|DELETE] namespace:title<br> |
41 | | - * |
42 | | - * or for maintaining hotspares: dbname FETCHED |
43 | | - * |
44 | | - * <p> |
45 | | - * TCP query is of syntax:<br> |
46 | | - * [FETCH|RESTORE] [NEW|OLD] [ON dbname]<br> |
47 | | - * |
48 | | - * <p> |
49 | | - * TCP response:<br> |
50 | | - * dbname [UPDATE|DELETE] namespace:title<br> |
51 | | - * ... |
52 | | - * |
53 | | - * @author rainman |
54 | | - * |
55 | | - */ |
56 | | -public class RecentUpdatesDaemon { |
57 | | - |
58 | | - /** dbname -> ns:title -> operation */ |
59 | | - protected Hashtable<String,Hashtable<String,String>> queue = new Hashtable<String,Hashtable<String,String>>(); |
60 | | - protected Hashtable<String,Hashtable<String,String>> oldqueue = new Hashtable<String,Hashtable<String,String>>(); |
61 | | - |
62 | | - /** use this lock when modifying queue */ |
63 | | - protected Object lock = new Object(); |
64 | | - |
65 | | - /** The UDP daemon that recieves update notifications from MediaWiki hook */ |
66 | | - class UDPServer extends Thread { |
67 | | - org.apache.log4j.Logger log = Logger.getLogger(UDPServer.class); |
68 | | - protected DatagramSocket socket = null; |
69 | | - |
70 | | - public UDPServer(){ |
71 | | - Configuration conf = Configuration.open(); |
72 | | - int udpPort = conf.getInt("RecentUpdateDaemon","udp",8111); |
73 | | - try { |
74 | | - socket = new DatagramSocket(udpPort); |
75 | | - log.info("UDP server up at port "+udpPort); |
76 | | - } catch (SocketException e) { |
77 | | - log.fatal("Cannot make UDP server at port "+udpPort+" : "+e.getMessage()); |
78 | | - } |
79 | | - } |
80 | | - |
81 | | - @Override |
82 | | - public void run() { |
83 | | - byte[] buf = new byte[1500]; |
84 | | - |
85 | | - for(;;){ |
86 | | - // receive request |
87 | | - DatagramPacket packet = new DatagramPacket(buf, buf.length); |
88 | | - try { |
89 | | - socket.receive(packet); |
90 | | - // handle request |
91 | | - String mesg = new String(packet.getData(),packet.getOffset(),packet.getLength(),"UTF-8"); |
92 | | - StringTokenizer st = new StringTokenizer(mesg,"\n\r"); |
93 | | - while(st.hasMoreTokens()){ |
94 | | - String s = st.nextToken(); |
95 | | - if(s.trim().equals("")) |
96 | | - continue; |
97 | | - String[] parts = s.split(" +",3); |
98 | | - // check if it's request form other (hotspare) daemon |
99 | | - if(parts.length == 2 && parts[1].equals("FETCHED")){ |
100 | | - if(queue.get(parts[0])!=null) |
101 | | - oldqueue.put(parts[0],queue.remove(parts[0])); |
102 | | - else |
103 | | - oldqueue.remove(parts[0]); |
104 | | - log.debug("Update for "+parts[0]+" fetched on other daemon"); |
105 | | - continue; |
106 | | - // syntax check |
107 | | - } else if(parts.length != 3){ |
108 | | - log.warn("Recieved bad syntax: "+s); |
109 | | - continue; |
110 | | - } |
111 | | - String dbname = parts[0]; |
112 | | - String oper = parts[1]; |
113 | | - String title = parts[2]; |
114 | | - if(!oper.equals("UPDATE") && !oper.equals("DELETE")){ |
115 | | - log.warn("Unrecognized operation (should be UPDATE or DELETE): "+parts[2]); |
116 | | - continue; |
117 | | - } |
118 | | - log.debug("Processing "+dbname+" "+oper+" "+title); |
119 | | - // update queue |
120 | | - synchronized(lock){ |
121 | | - Hashtable<String,String> titles = queue.get(dbname); |
122 | | - if(titles == null){ |
123 | | - titles = new Hashtable<String,String>(); |
124 | | - queue.put(dbname,titles); |
125 | | - } |
126 | | - titles.put(title,oper); |
127 | | - } |
128 | | - } |
129 | | - } catch (IOException e) { |
130 | | - log.warn("I/O error receiving UDP packet: "+e.getMessage()); |
131 | | - } |
132 | | - } |
133 | | - } |
134 | | - |
135 | | - |
136 | | - } |
137 | | - |
138 | | - /** TCP worker thread, handles requests */ |
139 | | - class TCPDaemon extends Thread { |
140 | | - org.apache.log4j.Logger log = Logger.getLogger(TCPDaemon.class); |
141 | | - protected BufferedReader in; |
142 | | - protected PrintWriter out; |
143 | | - |
144 | | - public TCPDaemon(Socket sock) { |
145 | | - try { |
146 | | - in = new BufferedReader(new InputStreamReader(sock.getInputStream())); |
147 | | - out = new PrintWriter(sock.getOutputStream(),true); |
148 | | - } catch (IOException e) { |
149 | | - log.warn("Error openning input/output streams"); |
150 | | - } |
151 | | - } |
152 | | - |
153 | | - @Override |
154 | | - public void run() { |
155 | | - |
156 | | - try { |
157 | | - handle(); |
158 | | - } catch (Exception e) { |
159 | | - log.warn("Error processing request: "+e.getMessage()); |
160 | | - } finally{ |
161 | | - try { out.close(); } catch(Exception e) { } |
162 | | - try { in.close(); } catch(Exception e) { } |
163 | | - } |
164 | | - } |
165 | | - |
166 | | - /** Single TCP request handler */ |
167 | | - protected void handle() throws IOException { |
168 | | - String line = in.readLine(); |
169 | | - boolean fetchnew = false, notify = false; |
170 | | - boolean restorenew = false; |
171 | | - |
172 | | - log.debug("Got request "+line); |
173 | | - |
174 | | - String db = null; |
175 | | - if(line.contains("ON")){ |
176 | | - String[] p = line.split(" "); |
177 | | - db = p[p.length-1]; |
178 | | - } |
179 | | - |
180 | | - if(line.startsWith("FETCH NEW")){ |
181 | | - fetchnew = true; |
182 | | - notify = true; |
183 | | - } else if(line.startsWith("RESTORE NEW")) |
184 | | - restorenew = true; |
185 | | - else if(line.startsWith("RESTORE OLD")) |
186 | | - fetchnew = false; |
187 | | - else if(line.startsWith("FETCH OLD")) |
188 | | - fetchnew = false; |
189 | | - else{ |
190 | | - log.warn("Invalid request: "+line); |
191 | | - return; |
192 | | - } |
193 | | - HashSet<String> changedDBs = new HashSet<String>(); |
194 | | - if(fetchnew){ |
195 | | - synchronized(lock){ |
196 | | - if(db == null){ |
197 | | - changedDBs.addAll(oldqueue.keySet()); |
198 | | - changedDBs.addAll(queue.keySet()); |
199 | | - oldqueue = queue; |
200 | | - queue = new Hashtable<String,Hashtable<String,String>>(); |
201 | | - } else if(queue.get(db)!=null){ |
202 | | - changedDBs.add(db); |
203 | | - oldqueue.put(db,queue.remove(db)); |
204 | | - } else |
205 | | - oldqueue.remove(db); |
206 | | - } |
207 | | - } |
208 | | - |
209 | | - // notify the backup daemon |
210 | | - if(notify){ |
211 | | - for(String dbname : changedDBs) |
212 | | - sendHotspareNotification(dbname+" FETCHED\n"); |
213 | | - } |
214 | | - |
215 | | - // respond |
216 | | - if(restorenew){ |
217 | | - // need to clone queue to make its iterator thread-safe |
218 | | - Hashtable<String,Hashtable<String,String>> q = (Hashtable<String, Hashtable<String, String>>) queue.clone(); |
219 | | - for(Entry<String,Hashtable<String,String>> et : q.entrySet()){ |
220 | | - write(et.getKey(),(Hashtable<String, String>)et.getValue().clone()); |
221 | | - } |
222 | | - } else if(db!=null && oldqueue.get(db)!=null){ |
223 | | - write(db,oldqueue.get(db)); |
224 | | - } else if(db==null){ |
225 | | - for(Entry<String,Hashtable<String,String>> et : oldqueue.entrySet()){ |
226 | | - write(et.getKey(),et.getValue()); |
227 | | - } |
228 | | - } |
229 | | - } |
230 | | - |
231 | | - /** Write out one db hashtable as: dbname operation ns:title */ |
232 | | - protected void write(String db, Hashtable<String,String> titles){ |
233 | | - if(titles == null) |
234 | | - return; |
235 | | - for(Entry<String,String> to : titles.entrySet()){ |
236 | | - String line = db+" "+to.getValue()+" "+to.getKey(); |
237 | | - log.debug("<<< "+line); |
238 | | - out.println(line); |
239 | | - } |
240 | | - } |
241 | | - |
242 | | - /** Send UDP packet to hotspare RecentUpdatesDaemon keeping it in sync */ |
243 | | - public void sendHotspareNotification(String data){ |
244 | | - if(hotspareHost==null || hotspareUdpPort==0) |
245 | | - return; // no hotspare |
246 | | - try{ |
247 | | - if(udpSocket == null) |
248 | | - udpSocket = new DatagramSocket(); |
249 | | - byte[] buf = data.getBytes(); |
250 | | - InetAddress address = InetAddress.getByName(hotspareHost); |
251 | | - DatagramPacket packet = new DatagramPacket(buf, buf.length, address, hotspareUdpPort); |
252 | | - udpSocket.send(packet); |
253 | | - } catch(Exception e){ |
254 | | - log.warn("Error sending datagram to hotspare: "+e.getMessage()); |
255 | | - } |
256 | | - |
257 | | - } |
258 | | - |
259 | | - } |
260 | | - |
261 | | - /** Recieves TCP connections */ |
262 | | - class TCPServer extends Thread { |
263 | | - org.apache.log4j.Logger log = Logger.getLogger(TCPServer.class); |
264 | | - @Override |
265 | | - public void run() { |
266 | | - int maxThreads = 10; |
267 | | - ServerSocket sock; |
268 | | - |
269 | | - Configuration config = Configuration.open(); |
270 | | - int port = config.getInt("RecentUpdateDaemon","tcp",8112); |
271 | | - try { |
272 | | - sock = new ServerSocket(port); |
273 | | - } catch (Exception e) { |
274 | | - log.fatal("Cannot make TCP server at port "+port+" : " + e.getMessage()); |
275 | | - return; |
276 | | - } |
277 | | - ExecutorService pool = Executors.newFixedThreadPool(maxThreads); |
278 | | - log.info("TCP server up at port "+port); |
279 | | - |
280 | | - for (;;) { |
281 | | - Socket client; |
282 | | - try { |
283 | | - client = sock.accept(); |
284 | | - } catch (Exception e) { |
285 | | - log.error("accept() error: " + e.getMessage()); |
286 | | - continue; |
287 | | - } |
288 | | - |
289 | | - TCPDaemon worker = new TCPDaemon(client); |
290 | | - pool.execute(worker); |
291 | | - } |
292 | | - |
293 | | - } |
294 | | - |
295 | | - } |
296 | | - protected DatagramSocket udpSocket=null; |
297 | | - protected String hotspareHost=null; |
298 | | - protected int hotspareUdpPort, hotspareTcpPort; |
299 | | - |
300 | | - /** Fetch queue from hotspare RecentUpdatesDaemon using command */ |
301 | | - public Hashtable<String,Hashtable<String,String>> fetchQueue(String command){ |
302 | | - Hashtable<String,Hashtable<String,String>> res = new Hashtable<String,Hashtable<String,String>>(); |
303 | | - |
304 | | - Socket socket = null; |
305 | | - PrintWriter out = null; |
306 | | - BufferedReader in = null; |
307 | | - |
308 | | - try { |
309 | | - socket = new Socket(hotspareHost, hotspareTcpPort); |
310 | | - out = new PrintWriter(socket.getOutputStream(), true); |
311 | | - in = new BufferedReader(new InputStreamReader(socket.getInputStream())); |
312 | | - |
313 | | - out.println(command); |
314 | | - |
315 | | - String line; |
316 | | - int count=0; |
317 | | - while((line = in.readLine()) != null){ |
318 | | - String[] parts = line.split(" ",3); |
319 | | - String dbname = parts[0]; |
320 | | - String oper = parts[1]; |
321 | | - String title = parts[2]; |
322 | | - |
323 | | - Hashtable<String,String> titles = res.get(dbname); |
324 | | - if(titles == null){ |
325 | | - titles = new Hashtable<String,String>(); |
326 | | - res.put(dbname,titles); |
327 | | - } |
328 | | - titles.put(title,oper); |
329 | | - count++; |
330 | | - } |
331 | | - System.out.println("Retrieved queue of size "+count+" from hotspare daemon"); |
332 | | - out.close(); |
333 | | - in.close(); |
334 | | - socket.close(); |
335 | | - } catch (IOException e) { |
336 | | - System.out.println("Warning: Could not get queue from the hotspare daemon at "+hotspareHost+": "+e.getMessage()); |
337 | | - } |
338 | | - return res; |
339 | | - } |
340 | | - |
341 | | - public RecentUpdatesDaemon(){ |
342 | | - Configuration config = Configuration.open(); // init log4j |
343 | | - hotspareHost = config.getString("RecentUpdateDaemon","hostspareHost"); |
344 | | - hotspareUdpPort = config.getInt("RecentUpdateDaemon","hostspareUdpPort",8111); |
345 | | - hotspareTcpPort = config.getInt("RecentUpdateDaemon","hostspareTcpPort",8112); |
346 | | - |
347 | | - // try to restore queues from the hotspare |
348 | | - if(hotspareHost != null){ |
349 | | - queue = fetchQueue("RESTORE NEW"); |
350 | | - oldqueue = fetchQueue("RESTORE OLD"); |
351 | | - } |
352 | | - |
353 | | - new UDPServer().start(); |
354 | | - new TCPServer().start(); |
355 | | - } |
356 | | - |
357 | | - public static void main(String args[]){ |
358 | | - new RecentUpdatesDaemon(); |
359 | | - } |
360 | | - |
361 | | - |
362 | | -} |
Index: trunk/lucene-search-2.0/src/org/wikimedia/lsearch/util/Localization.java |
— | — | @@ -224,7 +224,7 @@ |
225 | 225 | return new Title(canonicalNamespaces.get(ns),parts[1]); |
226 | 226 | // check lang namespaces |
227 | 227 | Hashtable<String,Integer> map = namespaces.get(lang); |
228 | | - if(map.containsKey(ns)) |
| 228 | + if(map!=null && map.containsKey(ns)) |
229 | 229 | return new Title(map.get(ns),parts[1]); |
230 | 230 | } |
231 | 231 | // not recognized namespace, using main |
Index: trunk/lucene-search-2.0/lsearch.conf |
— | — | @@ -82,6 +82,10 @@ |
83 | 83 | # Log, ganglia, localization |
84 | 84 | ################################################ |
85 | 85 | |
| 86 | +# If this host runs on multiple CPUs maintain a pool of index searchers |
| 87 | +# It's good idea to make it number of CPUs+1, or some larger odd number |
| 88 | +SearcherPool.size=3 |
| 89 | + |
86 | 90 | # URL to MediaWiki message files |
87 | 91 | Localization.url=file:///var/www/html/wiki-lucene/phase3/languages/messages |
88 | 92 | |
— | — | @@ -92,14 +96,6 @@ |
93 | 97 | # Max queue size on remote indexer after which we wait a bit |
94 | 98 | OAI.maxqueue=5000 |
95 | 99 | |
96 | | -# RecentUpdateDaemon udp and tcp ports |
97 | | -RecentUpdateDaemon.udp=8111 |
98 | | -RecentUpdateDaemon.tcp=8112 |
99 | | -# Hot spare |
100 | | -RecentUpdateDaemon.hostspareHost=vega |
101 | | -RecentUpdateDaemon.hostspareUdpPort=8111 |
102 | | -RecentUpdateDaemon.hostspareTcpPort=8112 |
103 | | - |
104 | 100 | # Log configuration |
105 | 101 | Logging.logconfig=/etc/mwsearch.log4j |
106 | 102 | |