r89233 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r89232‎ | r89233 | r89234 >
Date:22:47, 31 May 2011
Author:rfaulk
Status:deferred
Tags:
Comment:
added log copying functionality as copy_logs method in DataMapper class
Modified paths:
  • /trunk/fundraiser-statistics/fundraiser-scripts/classes/DataMapper.py (modified) (history)

Diff [purge]

Index: trunk/fundraiser-statistics/fundraiser-scripts/classes/DataMapper.py
@@ -19,17 +19,19 @@
2020 import math
2121 import commands
2222
23 -import cgi # web queries
24 -import re # regular expression matching
25 -import gzip # process gzipped logs
 23+import cgi
 24+import re
 25+import gzip
 26+import os
2627
27 -import MySQLdb # db access
 28+import MySQLdb
2829
2930 import datetime
3031 import Fundraiser_Tools.classes.DataLoader as DL
3132 import Fundraiser_Tools.classes.Helper as Hlp
3233 import Fundraiser_Tools.settings as projSet
3334 import Fundraiser_Tools.classes.TimestampProcessor as TP
 35+import Fundraiser_Tools.classes.FundraiserDataHandler as FDH
3436
3537 """
3638
@@ -38,35 +40,120 @@
3941 Base class for interacting with DataSource. Methods that are intended to be extended in derived classes include:
4042
4143 METHODS:
42 -
 44+
 45+ copy_logs - copies logs from source to destination for processing
4346
4447 """
4548 class DataMapper(object):
4649
47 - _squid_log_directory_ = projSet.__project_home__ + 'logs/'
48 -
4950 """
5051 Copies mining logs from remote site
5152 """
52 - def copy_logs(self):
53 - return
 53+ def copy_logs(self, type):
 54+
 55+ # '/archive/udplogs'
 56+
 57+ now = datetime.datetime.now()
 58+
5459
 60+ if type == FDH._TESTTYPE_BANNER_:
 61+ prefix = 'bannerImpressions-'
 62+ elif type == FDH._TESTTYPE_LP_:
 63+ prefix = 'landingpages-'
 64+
 65+ if now.hour > 12:
 66+ filename = prefix + str(now.year) + '-' + str(now.month) + '-' + str(now.day) + '-' + str(now.hour - 12) + 'PM*'
 67+ else:
 68+ filename = prefix + str(now.year) + '-' + str(now.month) + '-' + str(now.day) + '-' + str(now.hour) + 'AM*'
 69+
 70+ filename = 'bannerImpressions-2011-05-27-11PM--25*'
 71+
 72+ cmd = 'sftp ' + projSet.__user__ + '@' + projSet.__squid_log_server__ + ':' + projSet.__squid_log_home__ + filename + ' ' + projSet.__squid_log_local_home__
 73+
 74+ os.system(cmd)
 75+
 76+ return filename
 77+
 78+"""
 79+
 80+ CLASS :: FundraiserDataMapper
 81+
 82+ Data mapper specific to the Wikimedia Fundraiser
 83+
 84+ METHODS:
 85+
 86+ mine_squid_impression_requests - mining banner impressions from squid logs
 87+ mine_squid_landing_page_requests - mining landing page views from squid logs
 88+
 89+"""
 90+class FundraiserDataMapper(DataMapper):
 91+
 92+ _db = None
 93+ _cur = None
 94+
 95+ _squid_log_directory_ = projSet.__project_home__ + 'logs/'
 96+ _impression_table_name_ = 'banner_impressions'
 97+ _landing_page_table_name_ = 'landing_page_requests'
 98+
 99+ _BANNER_REQUEST_ = 0
 100+ _LP_REQUEST_ = 1
 101+
 102+ _BANNER_FIELDS_ = ' (start_timestamp, utm_source, referrer, country, lang, counts, on_minute) '
 103+ _LP_FIELDS_ = ' (start_timestamp, utm_source, utm_campaign, utm_medium, landing_page, page_url, referrer_url, browser, lang, country, project, ip, request_time) '
 104+
 105+
 106+ """ !! MODIFY -- use dataloaders! """
 107+ def _init_db(self):
 108+ self._db = MySQLdb.connect(host='127.0.0.1', user='rfaulk', db='faulkner', port=3307)
 109+ self._cur = self._db.cursor()
 110+
 111+ """ !! MODIFY -- use dataloaders! """
 112+ def _close_db(self):
 113+ self._cur.close()
 114+ self._db.close()
 115+
 116+
 117+
 118+ def _clear_squid_records(self, start, request_type):
 119+
 120+
 121+ """ Ensure that the range is correct; otherwise abort - critical that outside records are not deleted """
 122+ timestamp = TP.timestamp_convert_format(start,1,2)
 123+
 124+ if request_type == self._BANNER_REQUEST_:
 125+ deleteStmnt = 'delete from ' + self._impression_table_name_ + ' where start_timestamp = \'' + timestamp + '\';'
 126+ elif request_type == self._LP_REQUEST_:
 127+ deleteStmnt = 'delete from ' + self._landing_page_table_name_ + ' where start_timestamp = \'' + timestamp + '\';'
 128+
 129+ try:
 130+ self._cur.execute(deleteStmnt)
 131+ print >> sys.stdout, "Executed delete from impression: " + deleteStmnt
 132+ except:
 133+ print >> sys.stderr, "Could not execute delete:\n" + deleteStmnt + "\nResuming insert ..."
 134+ pass
 135+
 136+
 137+
55138 """
56139
57140 """
58141 def mine_squid_impression_requests(self, logFileName):
59142
60 - """ !! MODIFY -- use dataloaders! """
61 - db = MySQLdb.connect(host='127.0.0.1', user='rfaulk', db='faulkner', port=3307)
62 - cur = db.cursor()
 143+ self._init_db()
63144
64145 sltl = DL.SquidLogTableLoader()
65146 itl = DL.ImpressionTableLoader()
66 -
 147+
 148+ """ Retrieve the log timestamp from the filename """
 149+ time_stamps = Hlp.get_timestamps(logFileName)
 150+
 151+ start = time_stamps[0]
 152+ end = time_stamps[1]
 153+ start_timestamp_in = "convert(\'" + start + "\', datetime)"
67154 curr_time = TP.timestamp_from_obj(datetime.datetime.now(),1,3)
68 -
69 - # Initialization - open the file
70 - # logFileName = sys.argv[1];
 155+
 156+ """ Initialization - open the file
 157+ logFileName = sys.argv[1]; """
71158 if (re.search('\.gz', logFileName)):
72159 logFile = gzip.open(self._squid_log_directory_ + logFileName, 'r')
73160 total_lines_in_file = float(commands.getstatusoutput('zgrep -c "" ' + self._squid_log_directory_ + logFileName)[1])
@@ -77,34 +164,15 @@
78165 queryIndex = 4;
79166
80167 counts = Hlp.AutoVivification()
81 - insertStmt = 'INSERT INTO impression (utm_source, referrer, country, lang, counts, on_minute) values '
 168+ insertStmt = 'INSERT INTO ' + self._impression_table_name_ + self._BANNER_FIELDS_ + ' values '
82169
83170 min_log = -1
84171 hr_change = 0
85172 clamp = 0
86173
87 - """ Clear the records for hour ahead of adding """
88 - time_stamps = Hlp.get_timestamps(logFileName)
 174+ """ Clear the old records """
 175+ self._clear_squid_records(start, self._BANNER_REQUEST_)
89176
90 - start = time_stamps[0]
91 - end = time_stamps[1]
92 -
93 - # Ensure that the range is correct; otherwise abort - critical that outside records are not deleted
94 - time_diff = Hlp.get_timestamps_diff(start, end)
95 -
96 - if math.fabs(time_diff) <= 1.0:
97 - deleteStmnt = 'delete from impression where on_minute >= \'' + start + '\' and on_minute < \'' + end + '\';'
98 -
99 - try:
100 - # cur.execute(deleteStmnt)
101 - print >> sys.stdout, "Executed delete from impression: " + deleteStmnt
102 - except:
103 - print >> sys.stderr, "Could not execute delete:\n" + deleteStmnt + "\nResuming insert ..."
104 - pass
105 - else:
106 - print >> sys.stdout, "Could not execute delete statement, DIFF too large\ndiff = " + str(time_diff) + "\ntime_start = " + start + "\ntime_end = " + end + "\nResuming insert ..."
107 -
108 -
109177 """ Add a row to the SquidLogTable """
110178 sltl.insert_row(type='banner_impression',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct='0.0',total_rows='0')
111179
@@ -116,11 +184,6 @@
117185 while (line != ''):
118186
119187 lineArgs = line.split()
120 -
121 - # Filter out time data by minute -- if the time is not properly formatted skip the record
122 - # 2010-11-12T20:56:43.237
123 - if line_count > 88364:
124 - print line
125188
126189 try:
127190 time_stamp = lineArgs[2]
@@ -227,12 +290,12 @@
228291 count = langCounts[lang]
229292
230293 try:
231 - val = '(\'' + banner + '\',\'' + project + '\',\'' + country + '\',\'' + lang + '\',' \
 294+ val = '(' + start_timestamp_in + ',\'' + banner + '\',\'' + project + '\',\'' + country + '\',\'' + lang + '\',' \
232295 + str(count) + ',' + time_stamp_in + ');'
233296
234 - cur.execute(insertStmt + val)
 297+ self._cur.execute(insertStmt + val)
235298 except:
236 - db.rollback()
 299+ self._db.rollback()
237300 sys.exit("Database Interface Exception - Could not execute statement:\n" + insertStmt + val)
238301
239302 # Re-initialize counts
@@ -246,21 +309,25 @@
247310
248311 sltl.update_table_row(type='banner_impression',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct=completion.__str__(),total_rows=line_count.__str__())
249312
250 - cur.close()
251 - db.close()
 313+ self._close_db()
252314
253315 """
254316
255317 """
256318 def mine_squid_landing_page_requests(self, logFileName):
257319
258 - """ !! MODIFY -- use dataloaders! """
259 - db = MySQLdb.connect(host='127.0.0.1', user='rfaulk', db='faulkner', port=3307)
260 - cur = db.cursor()
 320+ self._init_db()
261321
262322 sltl = DL.SquidLogTableLoader()
263323 lptl = DL.LandingPageTableLoader()
264324
 325+
 326+ """ Retrieve the log timestamp from the filename """
 327+ time_stamps = Hlp.get_timestamps(logFileName)
 328+
 329+ start = time_stamps[0]
 330+ end = time_stamps[1]
 331+ start_timestamp_in = "convert(\'" + start + "\', datetime)"
265332 curr_time = TP.timestamp_from_obj(datetime.datetime.now(),1,3)
266333
267334 count_parse = 0
@@ -281,39 +348,18 @@
282349
283350 """ SQL Statements """
284351
285 - insertStmt_lp = 'INSERT INTO landing_page (utm_source, utm_campaign, utm_medium, landing_page,' + \
286 - 'page_url, referrer_url, browser, lang, country, project, ip, request_time) values '
 352+ insertStmt_lp = 'INSERT INTO ' + self._landing_page_table_name_ + self._LP_FIELDS_ + ' values '
287353
288 - """ Clear the records for hour ahead of adding """
289 - time_stamps = Hlp.get_timestamps(logFileName)
 354+ """ Clear the old records """
 355+ self._clear_squid_records(start, self._LP_REQUEST_)
290356
291 - start = time_stamps[0]
292 - end = time_stamps[1]
293 -
294 - # Ensure that the range is correct; otherwise abort - critical that outside records are not deleted
295 - time_diff = Hlp.get_timestamps_diff(start, end)
296 -
297 - if math.fabs(time_diff) <= 1.0:
298 - deleteStmnt = 'delete from landing_page where request_time >= \'' + start + '\' and request_time < \'' + end + '\';'
299 -
300 - try:
301 - # cur.execute(deleteStmnt)
302 - print >> sys.stdout, "Executed delete from landing page: " + deleteStmnt
303 - except:
304 - print >> sys.stderr, "Could not execute delete:\n" + deleteStmnt + "\nResuming insert ..."
305 - pass
306 - else:
307 - print >> sys.stdout, "Could not execute delete statement, DIFF too large\ndiff = " + str(time_diff) + "\ntime_start = " + start + "\ntime_end = " + end + "\nResuming insert ..."
308 -
309 -
 357+ """ Add a row to the SquidLogTable """
 358+ sltl.insert_row(type='lp_view',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct='0.0',total_rows='0')
 359+
310360 count_correct = 0
311361 count_total = 0
312362 line_count = 0
313363
314 - """ Add a row to the SquidLogTable """
315 - sltl.insert_row(type='lp_view',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct='0.0',total_rows='0')
316 -
317 -
318364 # PROCESS LOG FILE
319365 # ================
320366 line = logFile.readline()
@@ -462,7 +508,7 @@
463509
464510 except:
465511 landing_page = 'NONE'
466 - country = Hlp.localize_IP(cur, ip_add)
 512+ country = Hlp.localize_IP(self._cur, ip_add)
467513
468514 else: # ...wikimediafoundation.org/wiki/...
469515
@@ -483,11 +529,11 @@
484530 country = landing_path[3]
485531
486532 except:
487 - country = Hlp.localize_IP(cur, ip_add)
 533+ country = Hlp.localize_IP(self._cur, ip_add)
488534
489535 # If country is confused with the language use the ip
490536 if country == country.lower():
491 - country = Hlp.localize_IP(cur, ip_add)
 537+ country = Hlp.localize_IP(self._cur, ip_add)
492538
493539 # ensure fields exist
494540 try:
@@ -502,12 +548,12 @@
503549
504550 # INSERT INTO landing_page ('utm_source', 'utm_campaign', 'utm_medium', 'landing_page', 'page_url', 'lang', 'project', 'ip') values ()
505551 try:
506 - val = '(\'' + utm_source + '\',\'' + utm_campaign + '\',\'' + utm_medium + '\',\'' + landing_page + \
 552+ val = '(' + start_timestamp_in + ',\'' + utm_source + '\',\'' + utm_campaign + '\',\'' + utm_medium + '\',\'' + landing_page + \
507553 '\',\'' + landing_url + '\',\'' + referrer_url + '\',\'' + browser + '\',\'' + source_lang + '\',\'' + country + '\',\'' \
508554 + project + '\',\'' + ip_add + '\',' + 'convert(\'' + timestamp_string + '\', datetime)' + ');'
509555
510556 #print insertStmt + val
511 - cur.execute(insertStmt_lp + val)
 557+ self._cur.execute(insertStmt_lp + val)
512558
513559 except:
514560 print "Could not insert:\n" + insertStmt_lp + val
@@ -520,7 +566,5 @@
521567 completion = float(line_count / total_lines_in_file) * 100.0
522568 sltl.update_table_row(type='lp_view',log_copy_time=curr_time,start_time=start,end_time=end,log_completion_pct=completion.__str__(),total_rows=line_count.__str__())
523569
524 - cur.close()
525 - db.close()
526 -
 570+ self._close_db()
527571

Status & tagging log