Index: trunk/WikiWord/CatGraph/src/main/java/de/wikimedia/catgraph/CatGraph.java |
— | — | @@ -9,7 +9,6 @@ |
10 | 10 | import java.sql.SQLException; |
11 | 11 | import java.util.ArrayList; |
12 | 12 | import java.util.Collection; |
13 | | -import java.util.Iterator; |
14 | 13 | import java.util.List; |
15 | 14 | import java.util.Map; |
16 | 15 | |
— | — | @@ -33,6 +32,7 @@ |
34 | 33 | import de.brightbyte.data.cursor.DataCursor; |
35 | 34 | import de.brightbyte.db.DatabaseAccess; |
36 | 35 | import de.brightbyte.db.DatabaseUtil; |
| 36 | +import de.brightbyte.db.DatabaseDataSet.Cursor; |
37 | 37 | import de.brightbyte.io.ChunkingCursor; |
38 | 38 | import de.brightbyte.io.LineCursor; |
39 | 39 | import de.brightbyte.io.Output; |
— | — | @@ -43,6 +43,70 @@ |
44 | 44 | import de.brightbyte.util.SystemUtils; |
45 | 45 | |
46 | 46 | public class CatGraph extends ConsoleApp { |
| 47 | + public class ListElementPairCursor implements |
| 48 | + DataCursor<Pair<Integer, Integer>> { |
| 49 | + |
| 50 | + private DataCursor<? extends List<?>> cursor; |
| 51 | + private int aField; |
| 52 | + private int bField; |
| 53 | + |
| 54 | + public ListElementPairCursor(DataCursor<? extends List<?>> cursor, int aField, int bField) { |
| 55 | + this.cursor = cursor; |
| 56 | + this.aField = aField; |
| 57 | + this.bField = bField; |
| 58 | + } |
| 59 | + |
| 60 | + public void close() { |
| 61 | + cursor.close(); |
| 62 | + } |
| 63 | + |
| 64 | + public Pair<Integer, Integer> next() throws PersistenceException { |
| 65 | + List<?> row = cursor.next(); |
| 66 | + if ( row == null ) return null; |
| 67 | + |
| 68 | + int a = DatabaseUtil.asInt( row.get(aField) ); |
| 69 | + int b = DatabaseUtil.asInt( row.get(bField) ); |
| 70 | + |
| 71 | + return new Pair<Integer, Integer>(a, b); |
| 72 | + } |
| 73 | + |
| 74 | + } |
| 75 | + |
| 76 | + public class ResultSetPairCursor implements |
| 77 | + DataCursor<Pair<Integer, Integer>> { |
| 78 | + |
| 79 | + private ResultSet cursor; |
| 80 | + private int aField; |
| 81 | + private int bField; |
| 82 | + |
| 83 | + public ResultSetPairCursor(ResultSet cursor, int aField, int bField) { |
| 84 | + this.cursor = cursor; |
| 85 | + this.aField = aField; |
| 86 | + this.bField = bField; |
| 87 | + } |
| 88 | + |
| 89 | + public void close() { |
| 90 | + try { |
| 91 | + cursor.close(); |
| 92 | + } catch (SQLException e) { |
| 93 | + //ignore silently |
| 94 | + } |
| 95 | + } |
| 96 | + |
| 97 | + public Pair<Integer, Integer> next() throws PersistenceException { |
| 98 | + try { |
| 99 | + if (!cursor.next()) return null; |
| 100 | + |
| 101 | + int a = DatabaseUtil.asInt( cursor.getObject(aField) ); |
| 102 | + int b = DatabaseUtil.asInt( cursor.getObject(bField) ); |
| 103 | + |
| 104 | + return new Pair<Integer, Integer>( a, b ); |
| 105 | + } catch (SQLException e) { |
| 106 | + throw new PersistenceException(); |
| 107 | + } |
| 108 | + } |
| 109 | + } |
| 110 | + |
47 | 111 | protected class Descendants implements Command { |
48 | 112 | |
49 | 113 | private int start; |
— | — | @@ -63,56 +127,71 @@ |
64 | 128 | |
65 | 129 | private GraphDatabaseService graphDb; |
66 | 130 | private IndexService indexer; |
| 131 | + private long chunkSize = 100000; |
67 | 132 | |
68 | 133 | public CatGraph(GraphDatabaseService graphDb, IndexService indexer) { |
69 | 134 | this.graphDb = graphDb; |
70 | 135 | this.indexer = indexer; |
71 | 136 | } |
72 | 137 | |
73 | | - public void loadArcs(DatabaseAccess db, String sql, int fromCol, int toCol) throws SQLException { |
74 | | - ResultSet rs = db.executeQuery("load graph", sql); |
75 | | - while (rs.next()) { |
76 | | - int from = rs.getInt(fromCol); |
77 | | - int to = rs.getInt(toCol); |
78 | | - |
79 | | - putArc(from ,to); |
| 138 | + public void loadArcs(DatabaseAccess db, String sql, int fromCol, int toCol) throws PersistenceException { |
| 139 | + try { |
| 140 | + ResultSet rs = db.executeQuery("load graph", sql); |
| 141 | + loadArcs( new ResultSetPairCursor(rs, fromCol, toCol) ); |
| 142 | + } catch (SQLException e) { |
| 143 | + throw new PersistenceException(e); |
80 | 144 | } |
81 | 145 | } |
82 | 146 | |
83 | 147 | public void loadArcs(DataCursor<? extends List<?>> args, int fromCol, int toCol) throws PersistenceException { |
84 | | - ChunkedProgressRateTracker progressTracker = new ChunkedProgressRateTracker("arcs"); |
85 | | - |
86 | | - List<?> row ; |
87 | | - while ((row = args.next()) != null) { |
88 | | - int from = DatabaseUtil.asInt( row.get(fromCol) ); |
89 | | - int to = DatabaseUtil.asInt( row.get(toCol) ); |
90 | | - |
91 | | - putArc(from ,to); |
92 | | - |
93 | | - progressTracker.step(); |
94 | | - if ( progressTracker.chunkIf(10000, 10) ) { |
95 | | - out.println(progressTracker); |
96 | | - } |
97 | | - } |
| 148 | + loadArcs( new ListElementPairCursor(args, fromCol, toCol) ); |
98 | 149 | } |
99 | 150 | |
100 | 151 | public void loadArcs(DataCursor<Pair<Integer, Integer>> args) throws PersistenceException { |
101 | 152 | ChunkedProgressRateTracker progressTracker = new ChunkedProgressRateTracker("arcs"); |
102 | 153 | |
103 | | - Pair<Integer, Integer> row ; |
104 | | - while ((row = args.next()) != null) { |
105 | | - int from = row.getA(); |
106 | | - int to = row.getB(); |
| 154 | + Transaction tx = null; |
| 155 | + boolean done = false; |
| 156 | + try { |
| 157 | + Pair<Integer, Integer> row ; |
| 158 | + while ((row = args.next()) != null) { |
| 159 | + int from = row.getA(); |
| 160 | + int to = row.getB(); |
| 161 | + |
| 162 | + if (tx==null) tx = graphDb.beginTx(); |
| 163 | + |
| 164 | + putArc(from ,to); |
| 165 | + |
| 166 | + progressTracker.step(); |
| 167 | + //out.println("adding "+from+" -> "+to+" (#"+progressTracker.getCurrentChunkSize()+")"); |
| 168 | + |
| 169 | + if ( progressTracker.chunkIf(chunkSize , 10) ) { |
| 170 | + if (tx!=null) { |
| 171 | + long t = System.currentTimeMillis(); |
| 172 | + out.println("committing..."); |
| 173 | + tx.success(); |
| 174 | + tx.finish(); |
| 175 | + tx = null; |
| 176 | + out.println("commit took "+(System.currentTimeMillis() - t)+"ms."); |
| 177 | + } |
| 178 | + |
| 179 | + out.println(progressTracker); |
| 180 | + } |
| 181 | + } |
107 | 182 | |
108 | | - putArc(from ,to); |
109 | | - |
110 | | - progressTracker.step(); |
111 | | - if ( progressTracker.chunkIf(10000, 10) ) { |
112 | | - out.println(progressTracker); |
| 183 | + done = true; |
| 184 | + } finally { |
| 185 | + if ( tx != null) { |
| 186 | + if (done) tx.success(); |
| 187 | + else tx.failure(); |
| 188 | + |
| 189 | + tx.finish(); |
113 | 190 | } |
114 | 191 | } |
| 192 | + |
115 | 193 | } |
116 | 194 | |
| 195 | + /* |
117 | 196 | public void loadRoots(DatabaseAccess db, String sql) throws SQLException { |
118 | 197 | ResultSet rs = db.executeQuery("load graph", sql); |
119 | 198 | while (rs.next()) { |
— | — | @@ -137,6 +216,19 @@ |
138 | 217 | else return n; |
139 | 218 | } |
140 | 219 | |
| 220 | + public Relationship putRoot(int root) { |
| 221 | + return putRoot( aquireNodeByPageId(root) ); |
| 222 | + } |
| 223 | + |
| 224 | + public Relationship putRoot(Node root) { |
| 225 | + Node ref = graphDb.getReferenceNode(); |
| 226 | + if (ref.getId() == root.getId()) return null; |
| 227 | + |
| 228 | + Relationship relationship = ref.createRelationshipTo( root, CategoryRelationships.CONTAINS ); |
| 229 | + return relationship; |
| 230 | + } |
| 231 | +*/ |
| 232 | + |
141 | 233 | public Node getNodeByPageId(int pageId) { |
142 | 234 | return indexer.getSingleNode("page_id", pageId); |
143 | 235 | } |
— | — | @@ -158,24 +250,12 @@ |
159 | 251 | return putArc( aquireNodeByPageId(from), aquireNodeByPageId(cat) ); |
160 | 252 | } |
161 | 253 | |
162 | | - public Relationship putRoot(int root) { |
163 | | - return putRoot( aquireNodeByPageId(root) ); |
164 | | - } |
165 | | - |
166 | 254 | public Relationship putArc(Node from, Node cat) { |
167 | 255 | if ( from.getId() == cat.getId() ) return null; |
168 | 256 | Relationship relationship = cat.createRelationshipTo( from, CategoryRelationships.CONTAINS ); |
169 | 257 | return relationship; |
170 | 258 | } |
171 | 259 | |
172 | | - public Relationship putRoot(Node root) { |
173 | | - Node ref = graphDb.getReferenceNode(); |
174 | | - if (ref.getId() == root.getId()) return null; |
175 | | - |
176 | | - Relationship relationship = ref.createRelationshipTo( root, CategoryRelationships.CONTAINS ); |
177 | | - return relationship; |
178 | | - } |
179 | | - |
180 | 260 | public Collection<Integer> getDescendants(int start) { |
181 | 261 | Node n = getNodeByPageId(start); |
182 | 262 | if ( n == null ) throw new IllegalArgumentException("page_id "+start+" not found"); |
— | — | @@ -244,40 +324,38 @@ |
245 | 325 | configuration = CollectionUtils.asMap( SystemUtils.loadProperties(u, null) ); |
246 | 326 | } |
247 | 327 | |
248 | | - GraphDatabaseService graphDb = new EmbeddedGraphDatabase( args.getParameter(0), configuration ); |
249 | | - File tsv = new File(args.getParameter(1)); |
| 328 | + DataCursor<List<String>> cursor = null; |
| 329 | + GraphDatabaseService graphDb = null; |
| 330 | + IndexService indexer = null; |
250 | 331 | |
251 | | - IndexService indexer = new LuceneIndexService(graphDb); |
252 | | - |
253 | | - /* |
254 | | - DatabaseAccess db = new DatabaseSchema(null, dbInfo, null); |
255 | | - db.open(); |
256 | | - |
257 | | - db.executeUpdate("", "use "+database+";"); |
258 | | - */ |
259 | | - CatGraph graph = new CatGraph(graphDb, indexer); |
260 | | - |
261 | | - InputStreamReader rd = new InputStreamReader(new FileInputStream(tsv)); |
262 | | - ChunkingCursor cursor = new ChunkingCursor(new LineCursor(rd), CsvLineChunker.tsv); |
263 | | - |
264 | | - cursor.next(); //skip header in first line |
265 | | - |
266 | | - Transaction tx = graphDb.beginTx(); |
267 | | - try |
268 | | - { |
269 | | - System.out.println("loading arcs...."); |
270 | | - long t = System.currentTimeMillis(); |
271 | | - graph.loadArcs(cursor, 0, 1); |
272 | | - System.out.println("loading arcs took "+(System.currentTimeMillis() - t)+"ms."); |
273 | | - |
| 332 | + try { |
| 333 | + graphDb = new EmbeddedGraphDatabase( args.getParameter(0), configuration ); |
| 334 | + indexer = new LuceneIndexService(graphDb); |
| 335 | + |
| 336 | + CatGraph graph = new CatGraph(graphDb, indexer); |
| 337 | + |
| 338 | + if (args.getParameterCount()>1) { |
| 339 | + File tsv = new File(args.getParameter(1)); |
| 340 | + InputStreamReader rd = new InputStreamReader(new FileInputStream(tsv)); |
| 341 | + cursor = new ChunkingCursor(new LineCursor(rd), CsvLineChunker.tsv); |
| 342 | + |
| 343 | + cursor.next(); //skip header in first line |
| 344 | + |
| 345 | + System.out.println("loading arcs...."); |
| 346 | + long t = System.currentTimeMillis(); |
| 347 | + graph.loadArcs(cursor, 0, 1); |
| 348 | + System.out.println("loading arcs took "+(System.currentTimeMillis() - t)+"ms."); |
| 349 | + } |
| 350 | + |
274 | 351 | graph.run(); |
| 352 | + |
| 353 | + System.out.println( "done" ); |
275 | 354 | } |
276 | 355 | finally |
277 | 356 | { |
278 | | - tx.finish(); |
279 | | - graphDb.shutdown(); |
| 357 | + if (indexer!=null) indexer.shutdown(); |
| 358 | + if (graphDb!=null) graphDb.shutdown(); |
| 359 | + System.exit(0); |
280 | 360 | } |
281 | | - |
282 | | - System.out.println( "done" ); |
283 | 361 | } |
284 | 362 | } |