r70004 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r70003‎ | r70004 | r70005 >
Date:11:43, 27 July 2010
Author:daniel
Status:deferred
Tags:
Comment:
progress indicator, periodic flushing, optional re-loading
Modified paths:
  • /trunk/WikiWord/CatGraph/src/main/java/de/wikimedia/catgraph/CatGraph.java (modified) (history)

Diff [purge]

Index: trunk/WikiWord/CatGraph/src/main/java/de/wikimedia/catgraph/CatGraph.java
@@ -9,7 +9,6 @@
1010 import java.sql.SQLException;
1111 import java.util.ArrayList;
1212 import java.util.Collection;
13 -import java.util.Iterator;
1413 import java.util.List;
1514 import java.util.Map;
1615
@@ -33,6 +32,7 @@
3433 import de.brightbyte.data.cursor.DataCursor;
3534 import de.brightbyte.db.DatabaseAccess;
3635 import de.brightbyte.db.DatabaseUtil;
 36+import de.brightbyte.db.DatabaseDataSet.Cursor;
3737 import de.brightbyte.io.ChunkingCursor;
3838 import de.brightbyte.io.LineCursor;
3939 import de.brightbyte.io.Output;
@@ -43,6 +43,70 @@
4444 import de.brightbyte.util.SystemUtils;
4545
4646 public class CatGraph extends ConsoleApp {
 47+ public class ListElementPairCursor implements
 48+ DataCursor<Pair<Integer, Integer>> {
 49+
 50+ private DataCursor<? extends List<?>> cursor;
 51+ private int aField;
 52+ private int bField;
 53+
 54+ public ListElementPairCursor(DataCursor<? extends List<?>> cursor, int aField, int bField) {
 55+ this.cursor = cursor;
 56+ this.aField = aField;
 57+ this.bField = bField;
 58+ }
 59+
 60+ public void close() {
 61+ cursor.close();
 62+ }
 63+
 64+ public Pair<Integer, Integer> next() throws PersistenceException {
 65+ List<?> row = cursor.next();
 66+ if ( row == null ) return null;
 67+
 68+ int a = DatabaseUtil.asInt( row.get(aField) );
 69+ int b = DatabaseUtil.asInt( row.get(bField) );
 70+
 71+ return new Pair<Integer, Integer>(a, b);
 72+ }
 73+
 74+ }
 75+
 76+ public class ResultSetPairCursor implements
 77+ DataCursor<Pair<Integer, Integer>> {
 78+
 79+ private ResultSet cursor;
 80+ private int aField;
 81+ private int bField;
 82+
 83+ public ResultSetPairCursor(ResultSet cursor, int aField, int bField) {
 84+ this.cursor = cursor;
 85+ this.aField = aField;
 86+ this.bField = bField;
 87+ }
 88+
 89+ public void close() {
 90+ try {
 91+ cursor.close();
 92+ } catch (SQLException e) {
 93+ //ignore silently
 94+ }
 95+ }
 96+
 97+ public Pair<Integer, Integer> next() throws PersistenceException {
 98+ try {
 99+ if (!cursor.next()) return null;
 100+
 101+ int a = DatabaseUtil.asInt( cursor.getObject(aField) );
 102+ int b = DatabaseUtil.asInt( cursor.getObject(bField) );
 103+
 104+ return new Pair<Integer, Integer>( a, b );
 105+ } catch (SQLException e) {
 106+ throw new PersistenceException();
 107+ }
 108+ }
 109+ }
 110+
47111 protected class Descendants implements Command {
48112
49113 private int start;
@@ -63,56 +127,71 @@
64128
65129 private GraphDatabaseService graphDb;
66130 private IndexService indexer;
 131+ private long chunkSize = 100000;
67132
68133 public CatGraph(GraphDatabaseService graphDb, IndexService indexer) {
69134 this.graphDb = graphDb;
70135 this.indexer = indexer;
71136 }
72137
73 - public void loadArcs(DatabaseAccess db, String sql, int fromCol, int toCol) throws SQLException {
74 - ResultSet rs = db.executeQuery("load graph", sql);
75 - while (rs.next()) {
76 - int from = rs.getInt(fromCol);
77 - int to = rs.getInt(toCol);
78 -
79 - putArc(from ,to);
 138+ public void loadArcs(DatabaseAccess db, String sql, int fromCol, int toCol) throws PersistenceException {
 139+ try {
 140+ ResultSet rs = db.executeQuery("load graph", sql);
 141+ loadArcs( new ResultSetPairCursor(rs, fromCol, toCol) );
 142+ } catch (SQLException e) {
 143+ throw new PersistenceException(e);
80144 }
81145 }
82146
83147 public void loadArcs(DataCursor<? extends List<?>> args, int fromCol, int toCol) throws PersistenceException {
84 - ChunkedProgressRateTracker progressTracker = new ChunkedProgressRateTracker("arcs");
85 -
86 - List<?> row ;
87 - while ((row = args.next()) != null) {
88 - int from = DatabaseUtil.asInt( row.get(fromCol) );
89 - int to = DatabaseUtil.asInt( row.get(toCol) );
90 -
91 - putArc(from ,to);
92 -
93 - progressTracker.step();
94 - if ( progressTracker.chunkIf(10000, 10) ) {
95 - out.println(progressTracker);
96 - }
97 - }
 148+ loadArcs( new ListElementPairCursor(args, fromCol, toCol) );
98149 }
99150
100151 public void loadArcs(DataCursor<Pair<Integer, Integer>> args) throws PersistenceException {
101152 ChunkedProgressRateTracker progressTracker = new ChunkedProgressRateTracker("arcs");
102153
103 - Pair<Integer, Integer> row ;
104 - while ((row = args.next()) != null) {
105 - int from = row.getA();
106 - int to = row.getB();
 154+ Transaction tx = null;
 155+ boolean done = false;
 156+ try {
 157+ Pair<Integer, Integer> row ;
 158+ while ((row = args.next()) != null) {
 159+ int from = row.getA();
 160+ int to = row.getB();
 161+
 162+ if (tx==null) tx = graphDb.beginTx();
 163+
 164+ putArc(from ,to);
 165+
 166+ progressTracker.step();
 167+ //out.println("adding "+from+" -> "+to+" (#"+progressTracker.getCurrentChunkSize()+")");
 168+
 169+ if ( progressTracker.chunkIf(chunkSize , 10) ) {
 170+ if (tx!=null) {
 171+ long t = System.currentTimeMillis();
 172+ out.println("committing...");
 173+ tx.success();
 174+ tx.finish();
 175+ tx = null;
 176+ out.println("commit took "+(System.currentTimeMillis() - t)+"ms.");
 177+ }
 178+
 179+ out.println(progressTracker);
 180+ }
 181+ }
107182
108 - putArc(from ,to);
109 -
110 - progressTracker.step();
111 - if ( progressTracker.chunkIf(10000, 10) ) {
112 - out.println(progressTracker);
 183+ done = true;
 184+ } finally {
 185+ if ( tx != null) {
 186+ if (done) tx.success();
 187+ else tx.failure();
 188+
 189+ tx.finish();
113190 }
114191 }
 192+
115193 }
116194
 195+ /*
117196 public void loadRoots(DatabaseAccess db, String sql) throws SQLException {
118197 ResultSet rs = db.executeQuery("load graph", sql);
119198 while (rs.next()) {
@@ -137,6 +216,19 @@
138217 else return n;
139218 }
140219
 220+ public Relationship putRoot(int root) {
 221+ return putRoot( aquireNodeByPageId(root) );
 222+ }
 223+
 224+ public Relationship putRoot(Node root) {
 225+ Node ref = graphDb.getReferenceNode();
 226+ if (ref.getId() == root.getId()) return null;
 227+
 228+ Relationship relationship = ref.createRelationshipTo( root, CategoryRelationships.CONTAINS );
 229+ return relationship;
 230+ }
 231+*/
 232+
141233 public Node getNodeByPageId(int pageId) {
142234 return indexer.getSingleNode("page_id", pageId);
143235 }
@@ -158,24 +250,12 @@
159251 return putArc( aquireNodeByPageId(from), aquireNodeByPageId(cat) );
160252 }
161253
162 - public Relationship putRoot(int root) {
163 - return putRoot( aquireNodeByPageId(root) );
164 - }
165 -
166254 public Relationship putArc(Node from, Node cat) {
167255 if ( from.getId() == cat.getId() ) return null;
168256 Relationship relationship = cat.createRelationshipTo( from, CategoryRelationships.CONTAINS );
169257 return relationship;
170258 }
171259
172 - public Relationship putRoot(Node root) {
173 - Node ref = graphDb.getReferenceNode();
174 - if (ref.getId() == root.getId()) return null;
175 -
176 - Relationship relationship = ref.createRelationshipTo( root, CategoryRelationships.CONTAINS );
177 - return relationship;
178 - }
179 -
180260 public Collection<Integer> getDescendants(int start) {
181261 Node n = getNodeByPageId(start);
182262 if ( n == null ) throw new IllegalArgumentException("page_id "+start+" not found");
@@ -244,40 +324,38 @@
245325 configuration = CollectionUtils.asMap( SystemUtils.loadProperties(u, null) );
246326 }
247327
248 - GraphDatabaseService graphDb = new EmbeddedGraphDatabase( args.getParameter(0), configuration );
249 - File tsv = new File(args.getParameter(1));
 328+ DataCursor<List<String>> cursor = null;
 329+ GraphDatabaseService graphDb = null;
 330+ IndexService indexer = null;
250331
251 - IndexService indexer = new LuceneIndexService(graphDb);
252 -
253 - /*
254 - DatabaseAccess db = new DatabaseSchema(null, dbInfo, null);
255 - db.open();
256 -
257 - db.executeUpdate("", "use "+database+";");
258 - */
259 - CatGraph graph = new CatGraph(graphDb, indexer);
260 -
261 - InputStreamReader rd = new InputStreamReader(new FileInputStream(tsv));
262 - ChunkingCursor cursor = new ChunkingCursor(new LineCursor(rd), CsvLineChunker.tsv);
263 -
264 - cursor.next(); //skip header in first line
265 -
266 - Transaction tx = graphDb.beginTx();
267 - try
268 - {
269 - System.out.println("loading arcs....");
270 - long t = System.currentTimeMillis();
271 - graph.loadArcs(cursor, 0, 1);
272 - System.out.println("loading arcs took "+(System.currentTimeMillis() - t)+"ms.");
273 -
 332+ try {
 333+ graphDb = new EmbeddedGraphDatabase( args.getParameter(0), configuration );
 334+ indexer = new LuceneIndexService(graphDb);
 335+
 336+ CatGraph graph = new CatGraph(graphDb, indexer);
 337+
 338+ if (args.getParameterCount()>1) {
 339+ File tsv = new File(args.getParameter(1));
 340+ InputStreamReader rd = new InputStreamReader(new FileInputStream(tsv));
 341+ cursor = new ChunkingCursor(new LineCursor(rd), CsvLineChunker.tsv);
 342+
 343+ cursor.next(); //skip header in first line
 344+
 345+ System.out.println("loading arcs....");
 346+ long t = System.currentTimeMillis();
 347+ graph.loadArcs(cursor, 0, 1);
 348+ System.out.println("loading arcs took "+(System.currentTimeMillis() - t)+"ms.");
 349+ }
 350+
274351 graph.run();
 352+
 353+ System.out.println( "done" );
275354 }
276355 finally
277356 {
278 - tx.finish();
279 - graphDb.shutdown();
 357+ if (indexer!=null) indexer.shutdown();
 358+ if (graphDb!=null) graphDb.shutdown();
 359+ System.exit(0);
280360 }
281 -
282 - System.out.println( "done" );
283361 }
284362 }

Status & tagging log