r10205 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r10204‎ | r10205 | r10206 >
Date:12:54, 19 July 2005
Author:vibber
Status:old
Tags:
Comment:
* Overwrite duplicate page updates in the queue so updates apply correctly
* Configurable auto-flush count and delay
* Removed single-database flush method
* For now, 'flushall' RPC returns immediately: the actual flush happens
on the update thread when next reached
* Status line includes time since last flush
Modified paths:
  • /trunk/mwsearch/Search/Configuration.cs (modified) (history)
  • /trunk/mwsearch/Search/SearchState.cs (modified) (history)
  • /trunk/mwsearch/UpdateDaemon/Daemon.cs (modified) (history)
  • /trunk/mwsearch/UpdateDaemon/UpdateRecord.cs (modified) (history)
  • /trunk/mwsearch/UpdateDaemon/UpdateThread.cs (modified) (history)

Diff [purge]

Index: trunk/mwsearch/Search/SearchState.cs
@@ -302,7 +302,7 @@
303303 log.Debug("Nothing to delete for " + key);
304304 for (int i = 0; i < hits.Length(); i++) {
305305 int id = hits.Id(i);
306 - log.Debug("Trying to delete article number " + id + "for " + key);
 306+ log.Debug("Trying to delete article number " + id + " for " + key);
307307 try {
308308 reader.Delete(id);
309309 } catch (IOException e) {
Index: trunk/mwsearch/Search/Configuration.cs
@@ -98,6 +98,17 @@
9999 return s != null && s.Equals("true");
100100 }
101101
 102+ public int GetInt(string section, string name, int defaultValue) {
 103+ string s = GetString(section, name);
 104+ if (s == null)
 105+ return defaultValue;
 106+ try {
 107+ return int.Parse(s);
 108+ } catch (Exception e) {
 109+ return defaultValue;
 110+ }
 111+ }
 112+
102113 public bool IsLatin1(string dbname) {
103114 foreach (string name in GetArray("Database", "latin1")) {
104115 if (dbname.Equals(name)) {
Index: trunk/mwsearch/UpdateDaemon/UpdateRecord.cs
@@ -31,17 +31,26 @@
3232 public string Text;
3333 }
3434
35 - public interface UpdateRecord {
36 - string Database {
37 - get;
 35+ public abstract class UpdateRecord {
 36+ protected string _database;
 37+ protected Article _article;
 38+
 39+ public string Database {
 40+ get {
 41+ return _database;
 42+ }
3843 }
39 - void Apply(SearchState state);
 44+
 45+ public string Key {
 46+ get {
 47+ return _article.Key;
 48+ }
 49+ }
 50+
 51+ public abstract void Apply(SearchState state);
4052 }
4153
4254 public class PageUpdate : UpdateRecord {
43 - string _database;
44 - Article _article;
45 -
4655 public PageUpdate(string databaseName, Title title, string text) {
4756 _database = databaseName;
4857 _article = new Article(databaseName,
@@ -49,9 +58,7 @@
5059 text, "bogus timestamp");
5160 }
5261
53 - public string Database { get { return _database; } }
54 -
55 - public void Apply(SearchState state) {
 62+ public override void Apply(SearchState state) {
5663 state.ReplaceArticle(_article);
5764 }
5865
@@ -61,17 +68,12 @@
6269 }
6370
6471 public class PageDeletion : UpdateRecord {
65 - string _database;
66 - Article _article;
67 -
6872 public PageDeletion(string databaseName, Title title) {
6973 _database = databaseName;
7074 _article = new Article(title.Namespace, title.Text);
7175 }
7276
73 - public string Database { get { return _database; } }
74 -
75 - public void Apply(SearchState state) {
 77+ public override void Apply(SearchState state) {
7678 state.DeleteArticle(_article);
7779 }
7880
Index: trunk/mwsearch/UpdateDaemon/Daemon.cs
@@ -48,7 +48,7 @@
4949 "SearchUpdater",
5050 WellKnownObjectMode.Singleton);
5151
52 - UpdateThread.Run();
 52+ UpdateThread.Run(config);
5353 }
5454
5555 [XmlRpcMethod("searchupdater.updatePage")]
@@ -80,15 +80,9 @@
8181 return true;
8282 }
8383
84 - [XmlRpcMethod("searchupdater.flush")]
85 - public bool Flush(string databaseName) {
86 - UpdateThread.Flush(databaseName);
87 - return true;
88 - }
89 -
9084 [XmlRpcMethod("searchupdater.flushAll")]
9185 public bool FlushAll() {
92 - UpdateThread.FlushAll();
 86+ UpdateThread.Flush();
9387 return true;
9488 }
9589
Index: trunk/mwsearch/UpdateDaemon/UpdateThread.cs
@@ -34,44 +34,72 @@
3535
3636 static bool _isRunning = false;
3737 static bool _done = false;
 38+ static bool _flushNow = false;
3839 static object _threadLock = new object();
39 - static Queue _updateQueue;
4040
 41+ // If more than this number are queued, try to flush out updates
 42+ static int _maxQueueCount;
 43+
 44+ // If more than this many seconds have passed since last flush,
 45+ // initiate a flush-out.
 46+ static int _maxQueueTimeout;
 47+ static DateTime _lastFlush = DateTime.UtcNow;
 48+
 49+ // A hash table of hash tables, dbname -> title key -> UpdateRecord.
 50+ // Run all accesses behind _threadLock for safety.
 51+ static Hashtable _queuedUpdates;
 52+
4153 static UpdateThread() {
42 - _updateQueue = Queue.Synchronized(new Queue());
 54+ _queuedUpdates = new Hashtable();
4355 }
4456
45 - public static void Run() {
 57+ public static void Run(Configuration config) {
 58+ _maxQueueCount = config.GetInt( "Updater", "maxqueuecount", 500 );
 59+ _maxQueueTimeout = config.GetInt( "Updater", "maxqueuetimeout", 3600 );
4660 Start();
4761 while (!_done) {
4862 ApplyUpdates();
4963 Thread.Sleep(1000);
5064 }
 65+
 66+ // Apply any remaining updates before we quit
 67+ ApplyAll(_queuedUpdates);
 68+
5169 log.Info("Updater thread ending, quit requested.");
5270 }
5371
5472 public static void ApplyUpdates() {
5573 log.Debug("Checking for updates...");
56 - while (true) {
 74+ try {
 75+ Hashtable workUpdates = null;
5776 lock (_threadLock) {
58 - if (!_isRunning) {
 77+ if (!_isRunning && !_flushNow) {
5978 log.Debug("Update thread suspended.");
6079 return;
6180 }
6281
63 - try {
64 - UpdateRecord next = (UpdateRecord)_updateQueue.Dequeue();
65 - SearchState state = GetSearchState(next.Database);
66 - log.Info("Applying: " + next);
67 - next.Apply(state);
68 - } catch (InvalidOperationException) {
69 - log.Debug("All done!");
 82+ int queuedCount = Count;
 83+ if (queuedCount == 0) {
 84+ _flushNow = false;
 85+ log.Debug("Nothing to do.");
7086 return;
71 - } catch (Exception e) {
72 - log.Error("Unexpected error in update thread: " + e);
 87+ }
 88+
 89+ TimeSpan delta = (DateTime.UtcNow - _lastFlush);
 90+ if (!_flushNow
 91+ && delta.Seconds < _maxQueueTimeout
 92+ && queuedCount < _maxQueueCount) {
 93+ log.DebugFormat("{0} queued items waiting, {1} since last flush...",
 94+ queuedCount, delta);
7395 return;
7496 }
 97+
 98+ workUpdates = SwitchOut();
7599 }
 100+ ApplyAll(workUpdates);
 101+ } catch (Exception e) {
 102+ log.Error("Unexpected error in update thread: " + e);
 103+ return;
76104 }
77105 }
78106
@@ -81,20 +109,42 @@
82110 return state;
83111 }
84112
 113+ private static Hashtable SwitchOut() {
 114+ lock (_threadLock) {
 115+ log.Info("Preparing to flush all indexes...");
 116+
 117+ Hashtable workUpdates = _queuedUpdates;
 118+ _queuedUpdates = new Hashtable();
 119+ _lastFlush = DateTime.UtcNow;
 120+ _flushNow = false;
 121+
 122+ return workUpdates;
 123+ }
 124+ }
 125+
 126+ private static void ApplyOn(string databaseName, ICollection queue) {
 127+ log.Info("Applying updates to " + databaseName);
 128+ SearchState state = GetSearchState(databaseName);
 129+ foreach (UpdateRecord record in queue) {
 130+ log.Info("Applying: " + record);
 131+ record.Apply(state);
 132+ }
 133+ state.Close();
 134+ log.Info("Closed updates on " + databaseName);
 135+ }
 136+
 137+ private static void ApplyAll(Hashtable workUpdates) {
 138+ foreach (string dbname in workUpdates.Keys) {
 139+ ApplyOn(dbname, ((Hashtable)workUpdates[dbname]).Values);
 140+ }
 141+ }
 142+
85143 public static void Stop() {
86144 lock (_threadLock) {
87145 if (_isRunning) {
88146 log.InfoFormat("Stopping update thread, {0} updates queued",
89 - _updateQueue.Count);
 147+ Count);
90148 _isRunning = false;
91 -
92 - try {
93 - int resetStatesCount = SearchState.ResetStates();
94 - log.InfoFormat("Reset {0} search index states, {1} updates queued",
95 - resetStatesCount, _updateQueue.Count);
96 - } catch (Exception e) {
97 - log.Error("Error resetting indexes: " + e);
98 - }
99149 }
100150 }
101151 }
@@ -103,15 +153,22 @@
104154 lock (_threadLock) {
105155 if (!_isRunning) {
106156 log.InfoFormat("Starting update thread, {0} updates queued",
107 - _updateQueue.Count);
 157+ Count);
108158 _isRunning = true;
109159 }
110160 }
111161 }
112162
113163 public static void Enqueue(UpdateRecord record) {
 164+ lock (_threadLock) {
 165+ if (_queuedUpdates[record.Database] == null)
 166+ _queuedUpdates[record.Database] = new Hashtable();
 167+
 168+ // Supersede any prior queued update for this same page
 169+ // so we don't end up with duplicates in the index.
 170+ ((Hashtable)_queuedUpdates[record.Database])[record.Key] = record;
 171+ }
114172 log.Info("Queued item: " + record);
115 - _updateQueue.Enqueue(record);
116173 }
117174
118175 public static void Quit() {
@@ -120,39 +177,31 @@
121178 Stop();
122179 }
123180
 181+ public static int Count {
 182+ get {
 183+ lock (_threadLock) {
 184+ int n = 0;
 185+ foreach (string dbname in _queuedUpdates.Keys) {
 186+ n += ((Hashtable)_queuedUpdates[dbname]).Count;
 187+ }
 188+ return n;
 189+ }
 190+ }
 191+ }
 192+
124193 public static string GetStatus() {
125 - int count = _updateQueue.Count;
126 - return string.Format("Updater {0} running; {1} item{2} queued.",
 194+ int count = Count;
 195+ TimeSpan delta = (DateTime.UtcNow - _lastFlush);
 196+ return string.Format("Updater {0} running; {1} item{2} queued. {3} since last flush.",
127197 (_isRunning ? "IS" : "IS NOT" ),
128198 count,
129 - (count == 1 ? "" : "s" ));
 199+ (count == 1 ? "" : "s" ),
 200+ delta);
130201 }
131202
132 - public static void Flush(string databaseName) {
133 - lock (_threadLock) {
134 - log.InfoFormat("Flushing index for {0}, {1} updates queued",
135 - databaseName, _updateQueue.Count);
136 - SearchState state = SearchState.ForWiki(databaseName);
137 - state.Reopen();
138 - log.InfoFormat("Done flushing {0}, {1} updates queued",
139 - databaseName, _updateQueue.Count);
140 - }
 203+ public static void Flush() {
 204+ log.Info("Flush requested.");
 205+ _flushNow = true;
141206 }
142 -
143 - public static void FlushAll() {
144 - lock (_threadLock) {
145 - log.InfoFormat("Flushing all indexes, {0} updates queued",
146 - _updateQueue.Count);
147 -
148 - try {
149 - int resetStatesCount = SearchState.ResetStates();
150 - log.InfoFormat("Reset {0} search index states, {1} updates queued",
151 - resetStatesCount, _updateQueue.Count);
152 - } catch (Exception e) {
153 - log.Error("Error resetting indexes: " + e);
154 - }
155 - }
156 - }
157 -
158207 }
159208 }

Status & tagging log