source: stamper/stamper/stamper.py@ 51:2744123f56bf

Last change on this file since 51:2744123f56bf was 43:b086c7d68163, checked in by Borja Lopez <borja@…>, 10 years ago

Put charts into ~/.workstamps-charts by default, ensuring the directory
exists when we create an instance of Stamper.

Cleaned up imports a bit (first imports of std library modules, then
external dependencies)

Use os.path.join to generate the paths to the chart files (instead of
manually generating them concatenating strings)

File size: 16.8 KB
Line 
1
2import json
3import re
4import os.path
5from datetime import datetime, date, timedelta
6from os import symlink, remove, makedirs
7from collections import OrderedDict
8from operator import itemgetter
9import pygal
10
11
12STAMPS_FILE = os.path.expanduser('~/.workstamps.json')
13CHARTS_DIR = os.path.expanduser('~/.workstamps-charts')
14DATE_FORMAT = '%Y-%m-%d'
15TIME_FORMAT = '%H:%M:%S'
16DATETIME_FORMAT = '%Y-%m-%d %H:%M'
17HOURS_DAY = 8
18SECS_DAY = HOURS_DAY * 60 * 60
19
20
21class Stamper(object):
22
23 def __init__(self, stamps_file=STAMPS_FILE, charts_dir=CHARTS_DIR):
24 self.stamps_file = stamps_file
25 self.charts_dir = charts_dir
26 self.ensure_stamps_file()
27 self.ensure_charts_dir()
28 self.stamps = []
29
30 def __json_load(self, filename):
31 """
32 Load the stamps from a file in json format, returning
33 the parsed list.
34 """
35 with open(filename, 'r') as stamps_file:
36 try:
37 stamps = json.load(stamps_file)
38 except ValueError:
39 stamps = []
40 return stamps
41
42 def remove_duplicates(self):
43 """
44 Remove duplicated stamps from the stamps list
45 """
46 stamps = [dict(t) for t in set(
47 [tuple(d.items()) for d in self.stamps])]
48 self.stamps = stamps
49
50 def ensure_stamps_file(self):
51 if not os.path.exists(self.stamps_file):
52 with open(self.stamps_file, 'w') as stamps_file:
53 stamps_file.write('')
54
55 def ensure_charts_dir(self):
56 if not os.path.exists(self.charts_dir):
57 makedirs(self.charts_dir)
58
59 def load_stamps(self):
60 self.stamps = self.__json_load(self.stamps_file)
61
62 def sort_stamps(self):
63 """
64 Sort all the stamps by start and end dates
65 """
66 self.stamps = sorted(self.stamps, key=itemgetter('start', 'end'))
67
68 def save_stamps(self):
69 with open(self.stamps_file, 'w') as stamps_file:
70 json.dump(self.stamps, stamps_file, indent=4)
71
72 def stamp(self, start, end, customer, action):
73 self.stamps.append({
74 'start': start,
75 'end': end,
76 'customer': customer,
77 'action': action,
78 })
79
80 def last_stamp(self, n=1):
81 """
82 return the stamp in position -n, that is, starting from the latest one
83 and going back N positions in the list of stamps
84 """
85 if not self.stamps:
86 return None
87 return self.stamps[-n]
88
89 def worktime(self, start, end):
90 worktime = (datetime.strptime(end, DATETIME_FORMAT) -
91 datetime.strptime(start, DATETIME_FORMAT))
92 return worktime.seconds
93
94 def validate_filter(self, stamp_filter):
95 """
96 Validate a given filter. Filters can have the following notation:
97
98 - %Y-%m-%d: Times recorded at a given date
99
100 - %Y-%m-%d--%Y-%m-%d: Times recorded between two dates
101
102 - *%Y-%m-%d: Times recorded up to a given date
103
104 - %Y-%m-%d*: Times recorded from a given date
105
106 - N...N[d|w|m|y]: Times recorded N...N days/weeks/months/years ago
107
108 Important: all date comparisons are made on datetime objects, using
109 00:00 as the time (first second of the given day). This means that
110 for range filters, the first day is included, but the second day is not
111 """
112 filter_from = None
113 filter_to = None
114
115 if stamp_filter is None:
116 return filter_from, filter_to
117
118 if '--' in stamp_filter:
119 filter_from, filter_to = stamp_filter.split('--')
120 filter_from = datetime.strptime(filter_from, DATE_FORMAT)
121 filter_to = datetime.strptime(filter_to, DATE_FORMAT)
122
123 elif stamp_filter.startswith('*'):
124 filter_to = datetime.strptime(stamp_filter, '*'+DATE_FORMAT)
125 filter_to = filter_to.replace(hour=0, minute=0, second=0)
126
127 elif stamp_filter.endswith('*'):
128 filter_from = datetime.strptime(stamp_filter, DATE_FORMAT+'*')
129 filter_from = filter_from.replace(hour=0, minute=0, second=0)
130
131 elif re.search(r'(\d+[dD]{1})', stamp_filter):
132 number = int(stamp_filter.lower().replace('d', ''))
133 delta = timedelta(days=number)
134 filter_from = datetime.today() - delta
135 filter_from = filter_from.replace(hour=0, minute=0, second=0)
136
137 elif re.search(r'(\d+[wW]{1})', stamp_filter):
138 number = int(stamp_filter.lower().replace('w', '')) * 7
139 delta = timedelta(days=number)
140 filter_from = datetime.today() - delta
141 filter_from = filter_from.replace(hour=0, minute=0, second=0)
142
143 elif re.search(r'(\d+[mM]{1})', stamp_filter):
144 number = int(stamp_filter.lower().replace('m', ''))
145 past = date.today()
146 # start travelling in time, back to N months ago
147 for n in range(number):
148 past = past.replace(day=1) - timedelta(days=1)
149 # Now use the year/month from the past + the current day to set
150 # the proper date
151 filter_from = datetime(past.year, past.month, date.today().day)
152
153 elif re.search(r'(\d+[yY]{1})', stamp_filter):
154 number = int(stamp_filter.lower().replace('y', ''))
155 today = date.today()
156 filter_from = datetime(today.year - number, today.month, today.day)
157
158 else:
159 # maybe they are giving us a fixed date
160 try:
161 filter_from = datetime.strptime(stamp_filter, DATE_FORMAT)
162 except:
163 # nothing to be used as a filter, go on, printing a warning
164 print('[warning] invalid date filter: ' + stamp_filter)
165 else:
166 filter_from = filter_from.replace(hour=0, minute=0, second=0)
167 filter_to = filter_from + timedelta(days=1)
168
169 return filter_from, filter_to
170
171 @property
172 def customers(self):
173 customers = []
174 for stamp in self.stamps:
175 if stamp['customer'] not in customers:
176 customers.append(stamp['customer'])
177 customers.remove(None)
178 return customers
179
180 def totals(self, filter_from=None, filter_to=None):
181 totals = {}
182 for stamp in self.stamps:
183 customer = stamp['customer']
184 # customer will be None for "start" stamps, having no end time
185 if customer:
186 start = datetime.strptime(stamp['start'], DATETIME_FORMAT)
187 end = datetime.strptime(stamp['end'], DATETIME_FORMAT)
188 if filter_from and start < filter_from:
189 # if there is a filter setting a starting date for the
190 # report and the current stamp is from an earlier date, do
191 # not add it to the totals
192 continue
193 if filter_to and start > filter_to:
194 # similar for the end date
195 continue
196 if customer not in totals:
197 totals[customer] = 0
198 totals[customer] += self.worktime(stamp['start'], stamp['end'])
199 return totals
200
201 def details(self, filter_customer=None, filter_from=None, filter_to=None):
202 details = OrderedDict()
203 totals = OrderedDict()
204 total_customer = OrderedDict()
205 for stamp in self.stamps:
206 customer = stamp['customer']
207 if customer:
208 if filter_customer and customer != filter_customer:
209 # we are getting the details for only one customer, if this
210 # stamp is not for that customer, simply move on and ignore
211 # it
212 continue
213 start = datetime.strptime(stamp['start'], DATETIME_FORMAT)
214 start_day = start.strftime('%Y-%m-%d')
215 end = datetime.strptime(stamp['end'], DATETIME_FORMAT)
216 if filter_from and start < filter_from:
217 # if there is a filter setting a starting date for the
218 # report and the current stamp is from an earlier date, do
219 # not add it to the totals
220 continue
221 if filter_to and start > filter_to:
222 # similar for the end date
223 continue
224 # avoid "start" stamps
225 if start_day not in details:
226 details[start_day] = []
227 if start_day not in totals:
228 totals[start_day] = 0
229 worktime = self.worktime(stamp['start'], stamp['end'])
230 details[start_day].append(
231 '%(worktime)s %(customer)s %(action)s' % {
232 'worktime': str(timedelta(seconds=worktime)),
233 'customer': customer,
234 'action': stamp['action']
235 })
236 totals[start_day] += worktime
237 if start_day not in total_customer:
238 total_customer[start_day] = {}
239 if customer not in total_customer[start_day]:
240 total_customer[start_day][customer] = 0
241 total_customer[start_day][customer] += worktime
242 for day in totals:
243 totals[day] = str(timedelta(seconds=totals[day]))
244 return details, totals, total_customer
245
246 def timeline(self, customer=None, stamp_filter=None):
247 filter_from, filter_to = self.validate_filter(stamp_filter)
248 for stamp in self.stamps:
249 start = datetime.strptime(stamp['start'], DATETIME_FORMAT)
250 start_day = start.strftime('%Y-%m-%d')
251 if filter_from and start < filter_from:
252 # if there is a filter setting a starting date for the
253 # report and the current stamp is from an earlier date, do
254 # not add it to the totals
255 continue
256 if filter_to and start > filter_to:
257 # similar for the end date
258 continue
259
260 if not stamp['customer']:
261 if customer is None:
262 print(stamp['start'] + ' start')
263 else:
264 if customer and customer != stamp['customer']:
265 continue
266 if customer:
267 print(stamp['start'] + ' start')
268 print(' '.join([stamp['end'],
269 stamp['customer'],
270 stamp['action']]))
271
272 def graph_stamps(self, customer=None, stamp_filter=None):
273 """
274 Generate charts with information from the stamps
275 """
276 filter_from, filter_to = self.validate_filter(stamp_filter)
277 chart = pygal.Bar(title='Work hours per day',
278 range=(0, HOURS_DAY),
279 x_title='Days',
280 y_title='Work hours',
281 x_label_rotation=45)
282
283 details, totals, totals_customers = self.details(customer,
284 filter_from,
285 filter_to)
286 days = []
287 values = {}
288 for c in self.customers:
289 values[c] = []
290
291 found = []
292
293 for day in details:
294 for c in values:
295 seconds = totals_customers[day].get(c, 0)
296 if seconds and c not in found:
297 found.append(c)
298 human = timedelta(seconds=seconds).__str__()
299 values[c].append({'value': seconds/60.00/60.00,
300 'label': day + ': ' + human})
301 days.append(day)
302 chart.x_labels = map(str, days)
303
304 if customer:
305 chart.add(customer, values[customer])
306 else:
307 for c in found:
308 chart.add(c, values[c])
309
310 chart_name = 'chart-%s.svg' % datetime.today().strftime(
311 '%Y-%m-%d_%H%M%S')
312 chart_symlink = 'chart-latest.svg'
313 chart_path = os.path.join(self.charts_dir, chart_name)
314 chart_symlink_path = os.path.join(self.charts_dir, chart_symlink)
315
316 chart.render_to_file(chart_path)
317 print('Rendered chart: ' + chart_path)
318 if os.path.islink(chart_symlink_path):
319 remove(chart_symlink_path)
320 symlink(chart_name, chart_symlink_path)
321 print('Updated latest chart: ' + chart_symlink_path)
322
323 def show_stamps(self, customer=None, stamp_filter=None, verbose=False,
324 sum=False):
325 filter_from, filter_to = self.validate_filter(stamp_filter)
326
327 # If the user asks for verbose information, show it before the
328 # totals (mimicing what the original stamp tool does)
329 if verbose:
330 details, totals, total_customer = self.details(customer,
331 filter_from,
332 filter_to)
333 for day in details:
334 print('------ %(day)s ------' % {'day': day})
335 for line in details[day]:
336 print(line)
337 customer_day_totals = []
338 for tc in total_customer[day]:
339 tc_total = str(timedelta(seconds=total_customer[day][tc]))
340 customer_day_totals.append(tc+': '+tc_total)
341 print(', '.join(customer_day_totals))
342 if len(customer_day_totals) > 1:
343 # if there are multiple customers in the report, show the
344 # daily totals
345 print('daily total: %(total)s' % {'total': totals[day]})
346 print '-'*79
347
348 # now calculate the totals and show them
349 totals = self.totals(filter_from, filter_to)
350 if customer:
351 seconds=totals.get(customer, 0)
352 total = timedelta(seconds=totals.get(customer, 0))
353 print(' %(customer)s: %(total)s' % {'customer': customer,
354 'total': total})
355 else:
356 for c in totals:
357 seconds=totals[c]
358 total = timedelta(seconds=totals[c])
359 print(' %(customer)s: %(total)s' % {'customer': c,
360 'total': total})
361
362 if sum:
363 sum_tot = ''
364 if totals:
365 print('------ Totals ------' % {'day': day})
366 for day, tot in totals.iteritems():
367 print(' %(day)s: %(total)s' % {'day': day, 'total': tot})
368 sum_tot = "%(total)s %(new)s" % {
369 'total': sum_tot,
370 'new': total
371 }
372 totalSecs, sec = divmod(seconds, 60)
373 hr, min = divmod(totalSecs, 60)
374 totalDays, remaining = divmod(seconds, SECS_DAY)
375 remainingMin, remainingSec = divmod(remaining, (60))
376 remainingHr, remainingMin = divmod(remainingMin, (60))
377 print('----- %d:%02d:%02d -----' % (hr, min, sec))
378 print('--- %d days, remaining: %d:%02d (%d hours/day) ---' % (
379 totalDays, remainingHr, remainingMin, HOURS_DAY
380 ))
381
382 def remove_stamps(self, n=1):
383 """
384 Remove up to n stamps back, asking for confirmation before delete
385 """
386 for i in range(n):
387 stamp = self.last_stamp()
388 if not stamp['customer']:
389 print(stamp['start'] + ' start')
390 else:
391 print(' '.join([stamp['end'],
392 stamp['customer'],
393 stamp['action']]))
394 confirm = ''
395 while confirm.lower() not in ['y', 'n']:
396 confirm = raw_input('delete stamp? (y/n) ')
397 confirm = confirm.lower()
398 if confirm == 'y':
399 self.stamps.pop()
400 else:
401 # if the user says no to the removal of an stamp, we cannot
402 # keep deleting stamps after that one, as that could leave the
403 # stamps in an inconsistent state.
404 print('Aborting removal of stamps')
405 break
406 self.save_stamps()
407
408 def import_stamps(self, filename):
409 """
410 Import the stamps from the given file into the main stamps list,
411 merging them into the list (removing duplicated entries)
412 """
413 if not os.path.exists(filename):
414 print('[error] ' + filename + 'does not exist')
415 return
416 if os.path.isdir(filename):
417 print('[error] ' + filename + 'is a directory')
418 return
419 stamps = self.__json_load(filename)
420 if not stamps:
421 print('[warning] no stamps can be imported from ' + filename)
422 return
423 self.stamps.extend(stamps)
424 self.remove_duplicates()
425 self.sort_stamps()
426 self.save_stamps()
427 print('[warning] ' + str(len(stamps)) + ' stamps merged')
428 print('[warning] remember to review the resulting stamps file')
Note: See TracBrowser for help on using the repository browser.