Index: trunk/fundraiser-statistics/fundraiser-scripts/classes/DataMapper.py |
— | — | @@ -19,17 +19,19 @@ |
20 | 20 | import math |
21 | 21 | import commands |
22 | 22 | |
23 | | -import cgi # web queries |
24 | | -import re # regular expression matching |
25 | | -import gzip # process gzipped logs |
| 23 | +import cgi |
| 24 | +import re |
| 25 | +import gzip |
| 26 | +import os |
26 | 27 | |
27 | | -import MySQLdb # db access |
| 28 | +import MySQLdb |
28 | 29 | |
29 | 30 | import datetime |
30 | 31 | import Fundraiser_Tools.classes.DataLoader as DL |
31 | 32 | import Fundraiser_Tools.classes.Helper as Hlp |
32 | 33 | import Fundraiser_Tools.settings as projSet |
33 | 34 | import Fundraiser_Tools.classes.TimestampProcessor as TP |
| 35 | +import Fundraiser_Tools.classes.FundraiserDataHandler as FDH |
34 | 36 | |
35 | 37 | """ |
36 | 38 | |
— | — | @@ -38,35 +40,120 @@ |
39 | 41 | Base class for interacting with DataSource. Methods that are intended to be extended in derived classes include: |
40 | 42 | |
41 | 43 | METHODS: |
42 | | - |
| 44 | + |
| 45 | + copy_logs - copies logs from source to destination for processing |
43 | 46 | |
44 | 47 | """ |
45 | 48 | class DataMapper(object): |
46 | 49 | |
47 | | - _squid_log_directory_ = projSet.__project_home__ + 'logs/' |
48 | | - |
49 | 50 | """ |
50 | 51 | Copies mining logs from remote site |
51 | 52 | """ |
52 | | - def copy_logs(self): |
53 | | - return |
| 53 | + def copy_logs(self, type): |
| 54 | + |
| 55 | + # '/archive/udplogs' |
| 56 | + |
| 57 | + now = datetime.datetime.now() |
| 58 | + |
54 | 59 | |
| 60 | + if type == FDH._TESTTYPE_BANNER_: |
| 61 | + prefix = 'bannerImpressions-' |
| 62 | + elif type == FDH._TESTTYPE_LP_: |
| 63 | + prefix = 'landingpages-' |
| 64 | + |
| 65 | + if now.hour > 12: |
| 66 | + filename = prefix + str(now.year) + '-' + str(now.month) + '-' + str(now.day) + '-' + str(now.hour - 12) + 'PM*' |
| 67 | + else: |
| 68 | + filename = prefix + str(now.year) + '-' + str(now.month) + '-' + str(now.day) + '-' + str(now.hour) + 'AM*' |
| 69 | + |
| 70 | + filename = 'bannerImpressions-2011-05-27-11PM--25*' |
| 71 | + |
| 72 | + cmd = 'sftp ' + projSet.__user__ + '@' + projSet.__squid_log_server__ + ':' + projSet.__squid_log_home__ + filename + ' ' + projSet.__squid_log_local_home__ |
| 73 | + |
| 74 | + os.system(cmd) |
| 75 | + |
| 76 | + return filename |
| 77 | + |
| 78 | +""" |
| 79 | + |
| 80 | + CLASS :: FundraiserDataMapper |
| 81 | + |
| 82 | + Data mapper specific to the Wikimedia Fundraiser |
| 83 | + |
| 84 | + METHODS: |
| 85 | + |
| 86 | + mine_squid_impression_requests - mining banner impressions from squid logs |
| 87 | + mine_squid_landing_page_requests - mining landing page views from squid logs |
| 88 | + |
| 89 | +""" |
| 90 | +class FundraiserDataMapper(DataMapper): |
| 91 | + |
| 92 | + _db = None |
| 93 | + _cur = None |
| 94 | + |
| 95 | + _squid_log_directory_ = projSet.__project_home__ + 'logs/' |
| 96 | + _impression_table_name_ = 'banner_impressions' |
| 97 | + _landing_page_table_name_ = 'landing_page_requests' |
| 98 | + |
| 99 | + _BANNER_REQUEST_ = 0 |
| 100 | + _LP_REQUEST_ = 1 |
| 101 | + |
| 102 | + _BANNER_FIELDS_ = ' (start_timestamp, utm_source, referrer, country, lang, counts, on_minute) ' |
| 103 | + _LP_FIELDS_ = ' (start_timestamp, utm_source, utm_campaign, utm_medium, landing_page, page_url, referrer_url, browser, lang, country, project, ip, request_time) ' |
| 104 | + |
| 105 | + |
| 106 | + """ !! MODIFY -- use dataloaders! """ |
| 107 | + def _init_db(self): |
| 108 | + self._db = MySQLdb.connect(host='127.0.0.1', user='rfaulk', db='faulkner', port=3307) |
| 109 | + self._cur = self._db.cursor() |
| 110 | + |
| 111 | + """ !! MODIFY -- use dataloaders! """ |
| 112 | + def _close_db(self): |
| 113 | + self._cur.close() |
| 114 | + self._db.close() |
| 115 | + |
| 116 | + |
| 117 | + |
| 118 | + def _clear_squid_records(self, start, request_type): |
| 119 | + |
| 120 | + |
| 121 | + """ Ensure that the range is correct; otherwise abort - critical that outside records are not deleted """ |
| 122 | + timestamp = TP.timestamp_convert_format(start,1,2) |
| 123 | + |
| 124 | + if request_type == self._BANNER_REQUEST_: |
| 125 | + deleteStmnt = 'delete from ' + self._impression_table_name_ + ' where start_timestamp = \'' + timestamp + '\';' |
| 126 | + elif request_type == self._LP_REQUEST_: |
| 127 | + deleteStmnt = 'delete from ' + self._landing_page_table_name_ + ' where start_timestamp = \'' + timestamp + '\';' |
| 128 | + |
| 129 | + try: |
| 130 | + self._cur.execute(deleteStmnt) |
| 131 | + print >> sys.stdout, "Executed delete from impression: " + deleteStmnt |
| 132 | + except: |
| 133 | + print >> sys.stderr, "Could not execute delete:\n" + deleteStmnt + "\nResuming insert ..." |
| 134 | + pass |
| 135 | + |
| 136 | + |
| 137 | + |
55 | 138 | """ |
56 | 139 | |
57 | 140 | """ |
58 | 141 | def mine_squid_impression_requests(self, logFileName): |
59 | 142 | |
60 | | - """ !! MODIFY -- use dataloaders! """ |
61 | | - db = MySQLdb.connect(host='127.0.0.1', user='rfaulk', db='faulkner', port=3307) |
62 | | - cur = db.cursor() |
| 143 | + self._init_db() |
63 | 144 | |
64 | 145 | sltl = DL.SquidLogTableLoader() |
65 | 146 | itl = DL.ImpressionTableLoader() |
66 | | - |
| 147 | + |
| 148 | + """ Retrieve the log timestamp from the filename """ |
| 149 | + time_stamps = Hlp.get_timestamps(logFileName) |
| 150 | + |
| 151 | + start = time_stamps[0] |
| 152 | + end = time_stamps[1] |
| 153 | + start_timestamp_in = "convert(\'" + start + "\', datetime)" |
67 | 154 | curr_time = TP.timestamp_from_obj(datetime.datetime.now(),1,3) |
68 | | - |
69 | | - # Initialization - open the file |
70 | | - # logFileName = sys.argv[1]; |
| 155 | + |
| 156 | + """ Initialization - open the file |
| 157 | + logFileName = sys.argv[1]; """ |
71 | 158 | if (re.search('\.gz', logFileName)): |
72 | 159 | logFile = gzip.open(self._squid_log_directory_ + logFileName, 'r') |
73 | 160 | total_lines_in_file = float(commands.getstatusoutput('zgrep -c "" ' + self._squid_log_directory_ + logFileName)[1]) |
— | — | @@ -77,34 +164,15 @@ |
78 | 165 | queryIndex = 4; |
79 | 166 | |
80 | 167 | counts = Hlp.AutoVivification() |
81 | | - insertStmt = 'INSERT INTO impression (utm_source, referrer, country, lang, counts, on_minute) values ' |
| 168 | + insertStmt = 'INSERT INTO ' + self._impression_table_name_ + self._BANNER_FIELDS_ + ' values ' |
82 | 169 | |
83 | 170 | min_log = -1 |
84 | 171 | hr_change = 0 |
85 | 172 | clamp = 0 |
86 | 173 | |
87 | | - """ Clear the records for hour ahead of adding """ |
88 | | - time_stamps = Hlp.get_timestamps(logFileName) |
| 174 | + """ Clear the old records """ |
| 175 | + self._clear_squid_records(start, self._BANNER_REQUEST_) |
89 | 176 | |
90 | | - start = time_stamps[0] |
91 | | - end = time_stamps[1] |
92 | | - |
93 | | - # Ensure that the range is correct; otherwise abort - critical that outside records are not deleted |
94 | | - time_diff = Hlp.get_timestamps_diff(start, end) |
95 | | - |
96 | | - if math.fabs(time_diff) <= 1.0: |
97 | | - deleteStmnt = 'delete from impression where on_minute >= \'' + start + '\' and on_minute < \'' + end + '\';' |
98 | | - |
99 | | - try: |
100 | | - # cur.execute(deleteStmnt) |
101 | | - print >> sys.stdout, "Executed delete from impression: " + deleteStmnt |
102 | | - except: |
103 | | - print >> sys.stderr, "Could not execute delete:\n" + deleteStmnt + "\nResuming insert ..." |
104 | | - pass |
105 | | - else: |
106 | | - print >> sys.stdout, "Could not execute delete statement, DIFF too large\ndiff = " + str(time_diff) + "\ntime_start = " + start + "\ntime_end = " + end + "\nResuming insert ..." |
107 | | - |
108 | | - |
109 | 177 | """ Add a row to the SquidLogTable """ |
110 | 178 | sltl.insert_row(type='banner_impression',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct='0.0',total_rows='0') |
111 | 179 | |
— | — | @@ -116,11 +184,6 @@ |
117 | 185 | while (line != ''): |
118 | 186 | |
119 | 187 | lineArgs = line.split() |
120 | | - |
121 | | - # Filter out time data by minute -- if the time is not properly formatted skip the record |
122 | | - # 2010-11-12T20:56:43.237 |
123 | | - if line_count > 88364: |
124 | | - print line |
125 | 188 | |
126 | 189 | try: |
127 | 190 | time_stamp = lineArgs[2] |
— | — | @@ -227,12 +290,12 @@ |
228 | 291 | count = langCounts[lang] |
229 | 292 | |
230 | 293 | try: |
231 | | - val = '(\'' + banner + '\',\'' + project + '\',\'' + country + '\',\'' + lang + '\',' \ |
| 294 | + val = '(' + start_timestamp_in + ',\'' + banner + '\',\'' + project + '\',\'' + country + '\',\'' + lang + '\',' \ |
232 | 295 | + str(count) + ',' + time_stamp_in + ');' |
233 | 296 | |
234 | | - cur.execute(insertStmt + val) |
| 297 | + self._cur.execute(insertStmt + val) |
235 | 298 | except: |
236 | | - db.rollback() |
| 299 | + self._db.rollback() |
237 | 300 | sys.exit("Database Interface Exception - Could not execute statement:\n" + insertStmt + val) |
238 | 301 | |
239 | 302 | # Re-initialize counts |
— | — | @@ -246,21 +309,25 @@ |
247 | 310 | |
248 | 311 | sltl.update_table_row(type='banner_impression',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct=completion.__str__(),total_rows=line_count.__str__()) |
249 | 312 | |
250 | | - cur.close() |
251 | | - db.close() |
| 313 | + self._close_db() |
252 | 314 | |
253 | 315 | """ |
254 | 316 | |
255 | 317 | """ |
256 | 318 | def mine_squid_landing_page_requests(self, logFileName): |
257 | 319 | |
258 | | - """ !! MODIFY -- use dataloaders! """ |
259 | | - db = MySQLdb.connect(host='127.0.0.1', user='rfaulk', db='faulkner', port=3307) |
260 | | - cur = db.cursor() |
| 320 | + self._init_db() |
261 | 321 | |
262 | 322 | sltl = DL.SquidLogTableLoader() |
263 | 323 | lptl = DL.LandingPageTableLoader() |
264 | 324 | |
| 325 | + |
| 326 | + """ Retrieve the log timestamp from the filename """ |
| 327 | + time_stamps = Hlp.get_timestamps(logFileName) |
| 328 | + |
| 329 | + start = time_stamps[0] |
| 330 | + end = time_stamps[1] |
| 331 | + start_timestamp_in = "convert(\'" + start + "\', datetime)" |
265 | 332 | curr_time = TP.timestamp_from_obj(datetime.datetime.now(),1,3) |
266 | 333 | |
267 | 334 | count_parse = 0 |
— | — | @@ -281,39 +348,18 @@ |
282 | 349 | |
283 | 350 | """ SQL Statements """ |
284 | 351 | |
285 | | - insertStmt_lp = 'INSERT INTO landing_page (utm_source, utm_campaign, utm_medium, landing_page,' + \ |
286 | | - 'page_url, referrer_url, browser, lang, country, project, ip, request_time) values ' |
| 352 | + insertStmt_lp = 'INSERT INTO ' + self._landing_page_table_name_ + self._LP_FIELDS_ + ' values ' |
287 | 353 | |
288 | | - """ Clear the records for hour ahead of adding """ |
289 | | - time_stamps = Hlp.get_timestamps(logFileName) |
| 354 | + """ Clear the old records """ |
| 355 | + self._clear_squid_records(start, self._LP_REQUEST_) |
290 | 356 | |
291 | | - start = time_stamps[0] |
292 | | - end = time_stamps[1] |
293 | | - |
294 | | - # Ensure that the range is correct; otherwise abort - critical that outside records are not deleted |
295 | | - time_diff = Hlp.get_timestamps_diff(start, end) |
296 | | - |
297 | | - if math.fabs(time_diff) <= 1.0: |
298 | | - deleteStmnt = 'delete from landing_page where request_time >= \'' + start + '\' and request_time < \'' + end + '\';' |
299 | | - |
300 | | - try: |
301 | | - # cur.execute(deleteStmnt) |
302 | | - print >> sys.stdout, "Executed delete from landing page: " + deleteStmnt |
303 | | - except: |
304 | | - print >> sys.stderr, "Could not execute delete:\n" + deleteStmnt + "\nResuming insert ..." |
305 | | - pass |
306 | | - else: |
307 | | - print >> sys.stdout, "Could not execute delete statement, DIFF too large\ndiff = " + str(time_diff) + "\ntime_start = " + start + "\ntime_end = " + end + "\nResuming insert ..." |
308 | | - |
309 | | - |
| 357 | + """ Add a row to the SquidLogTable """ |
| 358 | + sltl.insert_row(type='lp_view',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct='0.0',total_rows='0') |
| 359 | + |
310 | 360 | count_correct = 0 |
311 | 361 | count_total = 0 |
312 | 362 | line_count = 0 |
313 | 363 | |
314 | | - """ Add a row to the SquidLogTable """ |
315 | | - sltl.insert_row(type='lp_view',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct='0.0',total_rows='0') |
316 | | - |
317 | | - |
318 | 364 | # PROCESS LOG FILE |
319 | 365 | # ================ |
320 | 366 | line = logFile.readline() |
— | — | @@ -462,7 +508,7 @@ |
463 | 509 | |
464 | 510 | except: |
465 | 511 | landing_page = 'NONE' |
466 | | - country = Hlp.localize_IP(cur, ip_add) |
| 512 | + country = Hlp.localize_IP(self._cur, ip_add) |
467 | 513 | |
468 | 514 | else: # ...wikimediafoundation.org/wiki/... |
469 | 515 | |
— | — | @@ -483,11 +529,11 @@ |
484 | 530 | country = landing_path[3] |
485 | 531 | |
486 | 532 | except: |
487 | | - country = Hlp.localize_IP(cur, ip_add) |
| 533 | + country = Hlp.localize_IP(self._cur, ip_add) |
488 | 534 | |
489 | 535 | # If country is confused with the language use the ip |
490 | 536 | if country == country.lower(): |
491 | | - country = Hlp.localize_IP(cur, ip_add) |
| 537 | + country = Hlp.localize_IP(self._cur, ip_add) |
492 | 538 | |
493 | 539 | # ensure fields exist |
494 | 540 | try: |
— | — | @@ -502,12 +548,12 @@ |
503 | 549 | |
504 | 550 | # INSERT INTO landing_page ('utm_source', 'utm_campaign', 'utm_medium', 'landing_page', 'page_url', 'lang', 'project', 'ip') values () |
505 | 551 | try: |
506 | | - val = '(\'' + utm_source + '\',\'' + utm_campaign + '\',\'' + utm_medium + '\',\'' + landing_page + \ |
| 552 | + val = '(' + start_timestamp_in + ',\'' + utm_source + '\',\'' + utm_campaign + '\',\'' + utm_medium + '\',\'' + landing_page + \ |
507 | 553 | '\',\'' + landing_url + '\',\'' + referrer_url + '\',\'' + browser + '\',\'' + source_lang + '\',\'' + country + '\',\'' \ |
508 | 554 | + project + '\',\'' + ip_add + '\',' + 'convert(\'' + timestamp_string + '\', datetime)' + ');' |
509 | 555 | |
510 | 556 | #print insertStmt + val |
511 | | - cur.execute(insertStmt_lp + val) |
| 557 | + self._cur.execute(insertStmt_lp + val) |
512 | 558 | |
513 | 559 | except: |
514 | 560 | print "Could not insert:\n" + insertStmt_lp + val |
— | — | @@ -520,7 +566,5 @@ |
521 | 567 | completion = float(line_count / total_lines_in_file) * 100.0 |
522 | 568 | sltl.update_table_row(type='lp_view',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct=completion.__str__(),total_rows=line_count.__str__()) |
523 | 569 | |
524 | | - cur.close() |
525 | | - db.close() |
526 | | - |
| 570 | + self._close_db() |
527 | 571 | |