source: stamper/stamper/stamper.py@ 38:fb741bfe9e1d

Last change on this file since 38:fb741bfe9e1d was 38:fb741bfe9e1d, checked in by Borja Lopez <borja@…>, 10 years ago

The timeline feature accepts now date-based filters:

  • Show the timeline for the last week:

stamps -t 1w

  • Show the timeline for august, 2013:

stamps -t 2013-08-01--2013-09-01

And so on.

File size: 15.7 KB
Line 
1
2import json
3import re
4import pygal
5from datetime import datetime, date, timedelta
6from os.path import expanduser, exists, islink, isdir
7from os import symlink, remove
8from collections import OrderedDict
9from operator import itemgetter
10
11
12STAMPS_FILE = expanduser('~/.workstamps.json')
13DATE_FORMAT = '%Y-%m-%d'
14TIME_FORMAT = '%H:%M:%S'
15DATETIME_FORMAT = '%Y-%m-%d %H:%M'
16HOURS_DAY = 8
17SECS_DAY = HOURS_DAY * 60 * 60
18
19
20class Stamper(object):
21
22 def __init__(self, stamps_file=STAMPS_FILE):
23 self.stamps_file = STAMPS_FILE
24 self.ensure_stamps_file()
25 self.stamps = []
26
27 def __json_load(self, filename):
28 """
29 Load the stamps from a file in json format, returning
30 the parsed list.
31 """
32 with open(filename, 'r') as stamps_file:
33 try:
34 stamps = json.load(stamps_file)
35 except ValueError:
36 stamps = []
37 return stamps
38
39 def remove_duplicates(self):
40 """
41 Remove duplicated stamps from the stamps list
42 """
43 stamps = [dict(t) for t in set(
44 [tuple(d.items()) for d in self.stamps])]
45 self.stamps = stamps
46
47 def ensure_stamps_file(self):
48 if not exists(self.stamps_file):
49 with open(self.stamps_file, 'w') as stamps_file:
50 stamps_file.write('')
51
52 def load_stamps(self):
53 self.stamps = self.__json_load(self.stamps_file)
54
55 def sort_stamps(self):
56 """
57 Sort all the stamps by start and end dates
58 """
59 self.stamps = sorted(self.stamps, key=itemgetter('start', 'end'))
60
61 def save_stamps(self):
62 with open(self.stamps_file, 'w') as stamps_file:
63 json.dump(self.stamps, stamps_file, indent=4)
64
65 def stamp(self, start, end, customer, action):
66 self.stamps.append({
67 'start': start,
68 'end': end,
69 'customer': customer,
70 'action': action,
71 })
72
73 def last_stamp(self, n=1):
74 """
75 return the stamp in position -n, that is, starting from the latest one
76 and going back N positions in the list of stamps
77 """
78 if not self.stamps:
79 return None
80 return self.stamps[-n]
81
82 def worktime(self, start, end):
83 worktime = (datetime.strptime(end, DATETIME_FORMAT) -
84 datetime.strptime(start, DATETIME_FORMAT))
85 return worktime.seconds
86
87 def validate_filter(self, stamp_filter):
88 """
89 Validate a given filter. Filters can have the following notation:
90
91 - %Y-%m-%d: Times recorded at a given date
92
93 - %Y-%m-%d--%Y-%m-%d: Times recorded between two dates
94
95 - *%Y-%m-%d: Times recorded up to a given date
96
97 - %Y-%m-%d*: Times recorded from a given date
98
99 - N...N[d|w|m|y]: Times recorded N...N days/weeks/months/years ago
100
101 Important: all date comparisons are made on datetime objects, using
102 00:00 as the time (first second of the given day). This means that
103 for range filters, the first day is included, but the second day is not
104 """
105 filter_from = None
106 filter_to = None
107
108 if stamp_filter is None:
109 return filter_from, filter_to
110
111 if '--' in stamp_filter:
112 filter_from, filter_to = stamp_filter.split('--')
113 filter_from = datetime.strptime(filter_from, DATE_FORMAT)
114 filter_to = datetime.strptime(filter_to, DATE_FORMAT)
115
116 elif stamp_filter.startswith('*'):
117 filter_to = datetime.strptime(stamp_filter, '*'+DATE_FORMAT)
118 filter_to = filter_to.replace(hour=0, minute=0, second=0)
119
120 elif stamp_filter.endswith('*'):
121 filter_from = datetime.strptime(stamp_filter, DATE_FORMAT+'*')
122 filter_from = filter_from.replace(hour=0, minute=0, second=0)
123
124 elif re.search(r'(\d+[dD]{1})', stamp_filter):
125 number = int(stamp_filter.lower().replace('d', ''))
126 delta = timedelta(days=number)
127 filter_from = datetime.today() - delta
128 filter_from = filter_from.replace(hour=0, minute=0, second=0)
129
130 elif re.search(r'(\d+[wW]{1})', stamp_filter):
131 number = int(stamp_filter.lower().replace('w', '')) * 7
132 delta = timedelta(days=number)
133 filter_from = datetime.today() - delta
134 filter_from = filter_from.replace(hour=0, minute=0, second=0)
135
136 elif re.search(r'(\d+[mM]{1})', stamp_filter):
137 number = int(stamp_filter.lower().replace('m', ''))
138 past = date.today()
139 # start travelling in time, back to N months ago
140 for n in range(number):
141 past = past.replace(day=1) - timedelta(days=1)
142 # Now use the year/month from the past + the current day to set
143 # the proper date
144 filter_from = datetime(past.year, past.month, date.today().day)
145
146 elif re.search(r'(\d+[yY]{1})', stamp_filter):
147 number = int(stamp_filter.lower().replace('y', ''))
148 today = date.today()
149 filter_from = datetime(today.year - number, today.month, today.day)
150
151 else:
152 # maybe they are giving us a fixed date
153 try:
154 filter_from = datetime.strptime(stamp_filter, DATE_FORMAT)
155 except:
156 # nothing to be used as a filter, go on, printing a warning
157 print('[warning] invalid date filter: ' + stamp_filter)
158 else:
159 filter_from = filter_from.replace(hour=0, minute=0, second=0)
160 filter_to = filter_from + timedelta(days=1)
161
162 return filter_from, filter_to
163
164 @property
165 def customers(self):
166 customers = []
167 for stamp in self.stamps:
168 if stamp['customer'] not in customers:
169 customers.append(stamp['customer'])
170 customers.remove(None)
171 return customers
172
173 def totals(self, filter_from=None, filter_to=None):
174 totals = {}
175 for stamp in self.stamps:
176 customer = stamp['customer']
177 # customer will be None for "start" stamps, having no end time
178 if customer:
179 start = datetime.strptime(stamp['start'], DATETIME_FORMAT)
180 end = datetime.strptime(stamp['end'], DATETIME_FORMAT)
181 if filter_from and start < filter_from:
182 # if there is a filter setting a starting date for the
183 # report and the current stamp is from an earlier date, do
184 # not add it to the totals
185 continue
186 if filter_to and start > filter_to:
187 # similar for the end date
188 continue
189 if customer not in totals:
190 totals[customer] = 0
191 totals[customer] += self.worktime(stamp['start'], stamp['end'])
192 return totals
193
194 def details(self, filter_customer=None, filter_from=None, filter_to=None):
195 details = OrderedDict()
196 totals = OrderedDict()
197 total_customer = OrderedDict()
198 for stamp in self.stamps:
199 customer = stamp['customer']
200 if customer:
201 if filter_customer and customer != filter_customer:
202 # we are getting the details for only one customer, if this
203 # stamp is not for that customer, simply move on and ignore
204 # it
205 continue
206 start = datetime.strptime(stamp['start'], DATETIME_FORMAT)
207 start_day = start.strftime('%Y-%m-%d')
208 end = datetime.strptime(stamp['end'], DATETIME_FORMAT)
209 if filter_from and start < filter_from:
210 # if there is a filter setting a starting date for the
211 # report and the current stamp is from an earlier date, do
212 # not add it to the totals
213 continue
214 if filter_to and start > filter_to:
215 # similar for the end date
216 continue
217 # avoid "start" stamps
218 if start_day not in details:
219 details[start_day] = []
220 if start_day not in totals:
221 totals[start_day] = 0
222 worktime = self.worktime(stamp['start'], stamp['end'])
223 details[start_day].append(
224 ' -> %(worktime)s %(customer)s %(action)s' % {
225 'worktime': str(timedelta(seconds=worktime)),
226 'customer': customer,
227 'action': stamp['action']
228 })
229 totals[start_day] += worktime
230 if start_day not in total_customer:
231 total_customer[start_day] = {}
232 if customer not in total_customer[start_day]:
233 total_customer[start_day][customer] = 0
234 total_customer[start_day][customer] += worktime
235 for day in totals:
236 totals[day] = str(timedelta(seconds=totals[day]))
237 return details, totals, total_customer
238
239 def timeline(self, stamp_filter=None):
240 filter_from, filter_to = self.validate_filter(stamp_filter)
241 for stamp in self.stamps:
242 customer = stamp['customer']
243 start = datetime.strptime(stamp['start'], DATETIME_FORMAT)
244 start_day = start.strftime('%Y-%m-%d')
245 if filter_from and start < filter_from:
246 # if there is a filter setting a starting date for the
247 # report and the current stamp is from an earlier date, do
248 # not add it to the totals
249 continue
250 if filter_to and start > filter_to:
251 # similar for the end date
252 continue
253 if not stamp['customer']:
254 print(stamp['start'] + ' start')
255 else:
256 print(' '.join([stamp['end'],
257 stamp['customer'],
258 stamp['action']]))
259
260 def graph_stamps(self, customer=None, stamp_filter=None):
261 """
262 Generate charts with information from the stamps
263 """
264 filter_from, filter_to = self.validate_filter(stamp_filter)
265 chart = pygal.Bar(title='Work hours per day',
266 range=(0, HOURS_DAY),
267 x_title='Days',
268 y_title='Work hours',
269 x_label_rotation=45)
270
271 details, totals, totals_customers = self.details(customer,
272 filter_from,
273 filter_to)
274 days = []
275 values = {}
276 for c in self.customers:
277 values[c] = []
278
279 found = []
280
281 for day in details:
282 for c in values:
283 seconds = totals_customers[day].get(c, 0)
284 if seconds and c not in found:
285 found.append(c)
286 human = timedelta(seconds=seconds).__str__()
287 values[c].append({'value': seconds/60.00/60.00,
288 'label': day + ': ' + human})
289 days.append(day)
290 chart.x_labels = map(str, days)
291
292 if customer:
293 chart.add(customer, values[customer])
294 else:
295 for c in found:
296 chart.add(c, values[c])
297
298 chart_name = 'chart-%s.svg' % datetime.today().strftime(
299 '%Y-%m-%d_%H%M%S')
300 chart_symlink = 'chart-latest.svg'
301 chart.render_to_file('graphs/' + chart_name)
302 if islink('graphs/'+ chart_symlink):
303 remove('graphs/'+ chart_symlink)
304 symlink(chart_name, 'graphs/'+ chart_symlink)
305
306 def show_stamps(self, customer=None, stamp_filter=None, verbose=False,
307 sum=False):
308 filter_from, filter_to = self.validate_filter(stamp_filter)
309
310 # If the user asks for verbose information, show it before the
311 # totals (mimicing what the original stamp tool does)
312 if verbose:
313 details, totals, total_customer = self.details(customer,
314 filter_from,
315 filter_to)
316 for day in details:
317 print('------ %(day)s ------' % {'day': day})
318 for line in details[day]:
319 print(line)
320 print(' Total: %(total)s' % {'total': totals[day]})
321 print '-'*79
322
323 # now calculate the totals and show them
324 totals = self.totals(filter_from, filter_to)
325 if customer:
326 seconds=totals.get(customer, 0)
327 total = timedelta(seconds=totals.get(customer, 0))
328 print(' %(customer)s: %(total)s' % {'customer': customer,
329 'total': total})
330 else:
331 for c in totals:
332 seconds=totals[c]
333 total = timedelta(seconds=totals[c])
334 print(' %(customer)s: %(total)s' % {'customer': c,
335 'total': total})
336
337 if sum:
338 sum_tot = ''
339 if totals:
340 print('------ Totals ------' % {'day': day})
341 for day, tot in totals.iteritems():
342 print(' %(day)s: %(total)s' % {'day': day, 'total': tot})
343 sum_tot = "%(total)s %(new)s" % {
344 'total': sum_tot,
345 'new': total
346 }
347 totalSecs, sec = divmod(seconds, 60)
348 hr, min = divmod(totalSecs, 60)
349 totalDays, remaining = divmod(seconds, SECS_DAY)
350 remainingMin, remainingSec = divmod(remaining, (60))
351 remainingHr, remainingMin = divmod(remainingMin, (60))
352 print('----- %d:%02d:%02d -----' % (hr, min, sec))
353 print('--- %d days, remaining: %d:%02d (%d hours/day) ---' % (
354 totalDays, remainingHr, remainingMin, HOURS_DAY
355 ))
356
357 def remove_stamps(self, n=1):
358 """
359 Remove up to n stamps back, asking for confirmation before delete
360 """
361 for i in range(n):
362 stamp = self.last_stamp()
363 if not stamp['customer']:
364 print(stamp['start'] + ' start')
365 else:
366 print(' '.join([stamp['end'],
367 stamp['customer'],
368 stamp['action']]))
369 confirm = ''
370 while confirm.lower() not in ['y', 'n']:
371 confirm = raw_input('delete stamp? (y/n) ')
372 confirm = confirm.lower()
373 if confirm == 'y':
374 self.stamps.pop()
375 else:
376 # if the user says no to the removal of an stamp, we cannot
377 # keep deleting stamps after that one, as that could leave the
378 # stamps in an inconsistent state.
379 print('Aborting removal of stamps')
380 break
381 self.save_stamps()
382
383 def import_stamps(self, filename):
384 """
385 Import the stamps from the given file into the main stamps list,
386 merging them into the list (removing duplicated entries)
387 """
388 if not exists(filename):
389 print('[error] ' + filename + 'does not exist')
390 return
391 if isdir(filename):
392 print('[error] ' + filename + 'is a directory')
393 return
394 stamps = self.__json_load(filename)
395 if not stamps:
396 print('[warning] no stamps can be imported from ' + filename)
397 return
398 self.stamps.extend(stamps)
399 self.remove_duplicates()
400 self.sort_stamps()
401 self.save_stamps()
402 print('[warning] ' + str(len(stamps)) + ' stamps merged')
403 print('[warning] remember to review the resulting stamps file')
Note: See TracBrowser for help on using the repository browser.