source: stamper/stamper/stamper.py@ 37:abe2ef05ce80

Last change on this file since 37:abe2ef05ce80 was 37:abe2ef05ce80, checked in by Borja Lopez <borja@…>, 10 years ago

Added feature to import stamps from a file:

stamps -i /path/to/stamps.json

The stamps file is read, stamps are loaded and then merged into the
main stamps stream, keeping proper order (by start, end dates)
and removing duplicated entries.

File size: 15.1 KB
RevLine 
[0]1
2import json
[7]3import re
[23]4import pygal
[26]5from datetime import datetime, date, timedelta
[37]6from os.path import expanduser, exists, islink, isdir
[30]7from os import symlink, remove
[19]8from collections import OrderedDict
[37]9from operator import itemgetter
[0]10
[7]11
[0]12STAMPS_FILE = expanduser('~/.workstamps.json')
[25]13DATE_FORMAT = '%Y-%m-%d'
[30]14TIME_FORMAT = '%H:%M:%S'
[25]15DATETIME_FORMAT = '%Y-%m-%d %H:%M'
[22]16HOURS_DAY = 8
17SECS_DAY = HOURS_DAY * 60 * 60
[0]18
19
20class Stamper(object):
21
22 def __init__(self, stamps_file=STAMPS_FILE):
23 self.stamps_file = STAMPS_FILE
24 self.ensure_stamps_file()
25 self.stamps = []
26
[37]27 def __json_load(self, filename):
28 """
29 Load the stamps from a file in json format, returning
30 the parsed list.
31 """
32 with open(filename, 'r') as stamps_file:
33 try:
34 stamps = json.load(stamps_file)
35 except ValueError:
36 stamps = []
37 return stamps
38
39 def remove_duplicates(self):
40 """
41 Remove duplicated stamps from the stamps list
42 """
43 stamps = [dict(t) for t in set(
44 [tuple(d.items()) for d in self.stamps])]
45 self.stamps = stamps
46
[0]47 def ensure_stamps_file(self):
48 if not exists(self.stamps_file):
49 with open(self.stamps_file, 'w') as stamps_file:
50 stamps_file.write('')
51
52 def load_stamps(self):
[37]53 self.stamps = self.__json_load(self.stamps_file)
54
55 def sort_stamps(self):
56 """
57 Sort all the stamps by start and end dates
58 """
59 self.stamps = sorted(self.stamps, key=itemgetter('start', 'end'))
[0]60
61 def save_stamps(self):
62 with open(self.stamps_file, 'w') as stamps_file:
63 json.dump(self.stamps, stamps_file, indent=4)
64
65 def stamp(self, start, end, customer, action):
66 self.stamps.append({
67 'start': start,
68 'end': end,
69 'customer': customer,
70 'action': action,
71 })
72
[32]73 def last_stamp(self, n=1):
74 """
75 return the stamp in position -n, that is, starting from the latest one
76 and going back N positions in the list of stamps
77 """
[0]78 if not self.stamps:
79 return None
[32]80 return self.stamps[-n]
[0]81
82 def worktime(self, start, end):
[25]83 worktime = (datetime.strptime(end, DATETIME_FORMAT) -
84 datetime.strptime(start, DATETIME_FORMAT))
[0]85 return worktime.seconds
86
[7]87 def validate_filter(self, stamp_filter):
88 """
89 Validate a given filter. Filters can have the following notation:
90
[26]91 - %Y-%m-%d: Times recorded at a given date
92
[7]93 - %Y-%m-%d--%Y-%m-%d: Times recorded between two dates
94
[26]95 - *%Y-%m-%d: Times recorded up to a given date
[7]96
[26]97 - %Y-%m-%d*: Times recorded from a given date
[7]98
99 - N...N[d|w|m|y]: Times recorded N...N days/weeks/months/years ago
[26]100
101 Important: all date comparisons are made on datetime objects, using
102 00:00 as the time (first second of the given day). This means that
103 for range filters, the first day is included, but the second day is not
[7]104 """
[26]105 filter_from = None
106 filter_to = None
107
108 if stamp_filter is None:
109 return filter_from, filter_to
110
111 if '--' in stamp_filter:
112 filter_from, filter_to = stamp_filter.split('--')
113 filter_from = datetime.strptime(filter_from, DATE_FORMAT)
114 filter_to = datetime.strptime(filter_to, DATE_FORMAT)
115
116 elif stamp_filter.startswith('*'):
117 filter_to = datetime.strptime(stamp_filter, '*'+DATE_FORMAT)
118 filter_to = filter_to.replace(hour=0, minute=0, second=0)
119
120 elif stamp_filter.endswith('*'):
121 filter_from = datetime.strptime(stamp_filter, DATE_FORMAT+'*')
122 filter_from = filter_from.replace(hour=0, minute=0, second=0)
123
124 elif re.search(r'(\d+[dD]{1})', stamp_filter):
125 number = int(stamp_filter.lower().replace('d', ''))
126 delta = timedelta(days=number)
127 filter_from = datetime.today() - delta
128 filter_from = filter_from.replace(hour=0, minute=0, second=0)
[7]129
[26]130 elif re.search(r'(\d+[wW]{1})', stamp_filter):
131 number = int(stamp_filter.lower().replace('w', '')) * 7
132 delta = timedelta(days=number)
133 filter_from = datetime.today() - delta
134 filter_from = filter_from.replace(hour=0, minute=0, second=0)
[7]135
[26]136 elif re.search(r'(\d+[mM]{1})', stamp_filter):
137 number = int(stamp_filter.lower().replace('m', ''))
138 past = date.today()
139 # start travelling in time, back to N months ago
140 for n in range(number):
141 past = past.replace(day=1) - timedelta(days=1)
142 # Now use the year/month from the past + the current day to set
143 # the proper date
144 filter_from = datetime(past.year, past.month, date.today().day)
145
146 elif re.search(r'(\d+[yY]{1})', stamp_filter):
147 number = int(stamp_filter.lower().replace('y', ''))
148 today = date.today()
149 filter_from = datetime(today.year - number, today.month, today.day)
150
151 else:
152 # maybe they are giving us a fixed date
[36]153 try:
154 filter_from = datetime.strptime(stamp_filter, DATE_FORMAT)
155 except:
156 # nothing to be used as a filter, go on, printing a warning
157 print('[warning] invalid date filter: ' + stamp_filter)
158 else:
159 filter_from = filter_from.replace(hour=0, minute=0, second=0)
160 filter_to = filter_from + timedelta(days=1)
[26]161
162 return filter_from, filter_to
[7]163
[30]164 @property
[0]165 def customers(self):
166 customers = []
167 for stamp in self.stamps:
168 if stamp['customer'] not in customers:
169 customers.append(stamp['customer'])
170 customers.remove(None)
171 return customers
172
[26]173 def totals(self, filter_from=None, filter_to=None):
[8]174 totals = {}
175 for stamp in self.stamps:
176 customer = stamp['customer']
[26]177 # customer will be None for "start" stamps, having no end time
[8]178 if customer:
[26]179 start = datetime.strptime(stamp['start'], DATETIME_FORMAT)
180 end = datetime.strptime(stamp['end'], DATETIME_FORMAT)
181 if filter_from and start < filter_from:
182 # if there is a filter setting a starting date for the
183 # report and the current stamp is from an earlier date, do
184 # not add it to the totals
185 continue
186 if filter_to and start > filter_to:
187 # similar for the end date
188 continue
[8]189 if customer not in totals:
190 totals[customer] = 0
191 totals[customer] += self.worktime(stamp['start'], stamp['end'])
192 return totals
193
[26]194 def details(self, filter_customer=None, filter_from=None, filter_to=None):
[19]195 details = OrderedDict()
196 totals = OrderedDict()
[23]197 total_customer = OrderedDict()
[0]198 for stamp in self.stamps:
[26]199 customer = stamp['customer']
200 if customer:
201 if filter_customer and customer != filter_customer:
202 # we are getting the details for only one customer, if this
203 # stamp is not for that customer, simply move on and ignore
204 # it
205 continue
206 start = datetime.strptime(stamp['start'], DATETIME_FORMAT)
207 start_day = start.strftime('%Y-%m-%d')
208 end = datetime.strptime(stamp['end'], DATETIME_FORMAT)
209 if filter_from and start < filter_from:
210 # if there is a filter setting a starting date for the
211 # report and the current stamp is from an earlier date, do
212 # not add it to the totals
213 continue
214 if filter_to and start > filter_to:
215 # similar for the end date
216 continue
[8]217 # avoid "start" stamps
218 if start_day not in details:
219 details[start_day] = []
220 if start_day not in totals:
221 totals[start_day] = 0
222 worktime = self.worktime(stamp['start'], stamp['end'])
223 details[start_day].append(
224 ' -> %(worktime)s %(customer)s %(action)s' % {
225 'worktime': str(timedelta(seconds=worktime)),
[26]226 'customer': customer,
[8]227 'action': stamp['action']
228 })
229 totals[start_day] += worktime
[23]230 if start_day not in total_customer:
231 total_customer[start_day] = {}
232 if customer not in total_customer[start_day]:
233 total_customer[start_day][customer] = 0
234 total_customer[start_day][customer] += worktime
[8]235 for day in totals:
236 totals[day] = str(timedelta(seconds=totals[day]))
[23]237 return details, totals, total_customer
[0]238
[28]239 def timeline(self):
240 for stamp in self.stamps:
241 customer = stamp['customer']
242 if not stamp['customer']:
243 print(stamp['start'] + ' start')
244 else:
245 print(' '.join([stamp['end'],
246 stamp['customer'],
247 stamp['action']]))
248
[30]249 def graph_stamps(self, customer=None, stamp_filter=None):
250 """
251 Generate charts with information from the stamps
252 """
253 filter_from, filter_to = self.validate_filter(stamp_filter)
254 chart = pygal.Bar(title='Work hours per day',
255 range=(0, HOURS_DAY),
256 x_title='Days',
257 y_title='Work hours',
258 x_label_rotation=45)
259
260 details, totals, totals_customers = self.details(customer,
261 filter_from,
262 filter_to)
263 days = []
264 values = {}
265 for c in self.customers:
266 values[c] = []
267
268 found = []
269
270 for day in details:
271 for c in values:
272 seconds = totals_customers[day].get(c, 0)
273 if seconds and c not in found:
274 found.append(c)
275 human = timedelta(seconds=seconds).__str__()
276 values[c].append({'value': seconds/60.00/60.00,
277 'label': day + ': ' + human})
278 days.append(day)
279 chart.x_labels = map(str, days)
280
281 if customer:
282 chart.add(customer, values[customer])
283 else:
284 for c in found:
285 chart.add(c, values[c])
286
287 chart_name = 'chart-%s.svg' % datetime.today().strftime(
288 '%Y-%m-%d_%H%M%S')
289 chart_symlink = 'chart-latest.svg'
290 chart.render_to_file('graphs/' + chart_name)
291 if islink('graphs/'+ chart_symlink):
292 remove('graphs/'+ chart_symlink)
293 symlink(chart_name, 'graphs/'+ chart_symlink)
294
[22]295 def show_stamps(self, customer=None, stamp_filter=None, verbose=False,
[30]296 sum=False):
[26]297 filter_from, filter_to = self.validate_filter(stamp_filter)
[9]298
[26]299 # If the user asks for verbose information, show it before the
300 # totals (mimicing what the original stamp tool does)
301 if verbose:
302 details, totals, total_customer = self.details(customer,
303 filter_from,
304 filter_to)
305 for day in details:
306 print('------ %(day)s ------' % {'day': day})
307 for line in details[day]:
308 print(line)
309 print(' Total: %(total)s' % {'total': totals[day]})
310 print '-'*79
[9]311
[26]312 # now calculate the totals and show them
313 totals = self.totals(filter_from, filter_to)
[9]314 if customer:
[22]315 seconds=totals.get(customer, 0)
[23]316 total = timedelta(seconds=totals.get(customer, 0))
[9]317 print(' %(customer)s: %(total)s' % {'customer': customer,
318 'total': total})
319 else:
320 for c in totals:
[22]321 seconds=totals[c]
[23]322 total = timedelta(seconds=totals[c])
[9]323 print(' %(customer)s: %(total)s' % {'customer': c,
324 'total': total})
325
[22]326 if sum:
327 sum_tot = ''
328 if totals:
329 print('------ Totals ------' % {'day': day})
330 for day, tot in totals.iteritems():
331 print(' %(day)s: %(total)s' % {'day': day, 'total': tot})
332 sum_tot = "%(total)s %(new)s" % {
333 'total': sum_tot,
334 'new': total
335 }
336 totalSecs, sec = divmod(seconds, 60)
337 hr, min = divmod(totalSecs, 60)
338 totalDays, remaining = divmod(seconds, SECS_DAY)
339 remainingMin, remainingSec = divmod(remaining, (60))
340 remainingHr, remainingMin = divmod(remainingMin, (60))
341 print('----- %d:%02d:%02d -----' % (hr, min, sec))
342 print('--- %d days, remaining: %d:%02d (%d hours/day) ---' % (
343 totalDays, remainingHr, remainingMin, HOURS_DAY
344 ))
[32]345
346 def remove_stamps(self, n=1):
347 """
348 Remove up to n stamps back, asking for confirmation before delete
349 """
350 for i in range(n):
351 stamp = self.last_stamp()
352 if not stamp['customer']:
353 print(stamp['start'] + ' start')
354 else:
355 print(' '.join([stamp['end'],
356 stamp['customer'],
357 stamp['action']]))
358 confirm = ''
359 while confirm.lower() not in ['y', 'n']:
360 confirm = raw_input('delete stamp? (y/n) ')
361 confirm = confirm.lower()
362 if confirm == 'y':
363 self.stamps.pop()
364 else:
365 # if the user says no to the removal of an stamp, we cannot
366 # keep deleting stamps after that one, as that could leave the
367 # stamps in an inconsistent state.
368 print('Aborting removal of stamps')
369 break
370 self.save_stamps()
[37]371
372 def import_stamps(self, filename):
373 """
374 Import the stamps from the given file into the main stamps list,
375 merging them into the list (removing duplicated entries)
376 """
377 if not exists(filename):
378 print('[error] ' + filename + 'does not exist')
379 return
380 if isdir(filename):
381 print('[error] ' + filename + 'is a directory')
382 return
383 stamps = self.__json_load(filename)
384 if not stamps:
385 print('[warning] no stamps can be imported from ' + filename)
386 return
387 self.stamps.extend(stamps)
388 self.remove_duplicates()
389 self.sort_stamps()
390 self.save_stamps()
391 print('[warning] ' + str(len(stamps)) + ' stamps merged')
392 print('[warning] remember to review the resulting stamps file')
Note: See TracBrowser for help on using the repository browser.