source: stamper/stamper/stamper.py@ 37:abe2ef05ce80

Last change on this file since 37:abe2ef05ce80 was 37:abe2ef05ce80, checked in by Borja Lopez <borja@…>, 10 years ago

Added feature to import stamps from a file:

stamps -i /path/to/stamps.json

The stamps file is read, stamps are loaded and then merged into the
main stamps stream, keeping proper order (by start, end dates)
and removing duplicated entries.

File size: 15.1 KB
Line 
1
2import json
3import re
4import pygal
5from datetime import datetime, date, timedelta
6from os.path import expanduser, exists, islink, isdir
7from os import symlink, remove
8from collections import OrderedDict
9from operator import itemgetter
10
11
12STAMPS_FILE = expanduser('~/.workstamps.json')
13DATE_FORMAT = '%Y-%m-%d'
14TIME_FORMAT = '%H:%M:%S'
15DATETIME_FORMAT = '%Y-%m-%d %H:%M'
16HOURS_DAY = 8
17SECS_DAY = HOURS_DAY * 60 * 60
18
19
20class Stamper(object):
21
22 def __init__(self, stamps_file=STAMPS_FILE):
23 self.stamps_file = STAMPS_FILE
24 self.ensure_stamps_file()
25 self.stamps = []
26
27 def __json_load(self, filename):
28 """
29 Load the stamps from a file in json format, returning
30 the parsed list.
31 """
32 with open(filename, 'r') as stamps_file:
33 try:
34 stamps = json.load(stamps_file)
35 except ValueError:
36 stamps = []
37 return stamps
38
39 def remove_duplicates(self):
40 """
41 Remove duplicated stamps from the stamps list
42 """
43 stamps = [dict(t) for t in set(
44 [tuple(d.items()) for d in self.stamps])]
45 self.stamps = stamps
46
47 def ensure_stamps_file(self):
48 if not exists(self.stamps_file):
49 with open(self.stamps_file, 'w') as stamps_file:
50 stamps_file.write('')
51
52 def load_stamps(self):
53 self.stamps = self.__json_load(self.stamps_file)
54
55 def sort_stamps(self):
56 """
57 Sort all the stamps by start and end dates
58 """
59 self.stamps = sorted(self.stamps, key=itemgetter('start', 'end'))
60
61 def save_stamps(self):
62 with open(self.stamps_file, 'w') as stamps_file:
63 json.dump(self.stamps, stamps_file, indent=4)
64
65 def stamp(self, start, end, customer, action):
66 self.stamps.append({
67 'start': start,
68 'end': end,
69 'customer': customer,
70 'action': action,
71 })
72
73 def last_stamp(self, n=1):
74 """
75 return the stamp in position -n, that is, starting from the latest one
76 and going back N positions in the list of stamps
77 """
78 if not self.stamps:
79 return None
80 return self.stamps[-n]
81
82 def worktime(self, start, end):
83 worktime = (datetime.strptime(end, DATETIME_FORMAT) -
84 datetime.strptime(start, DATETIME_FORMAT))
85 return worktime.seconds
86
87 def validate_filter(self, stamp_filter):
88 """
89 Validate a given filter. Filters can have the following notation:
90
91 - %Y-%m-%d: Times recorded at a given date
92
93 - %Y-%m-%d--%Y-%m-%d: Times recorded between two dates
94
95 - *%Y-%m-%d: Times recorded up to a given date
96
97 - %Y-%m-%d*: Times recorded from a given date
98
99 - N...N[d|w|m|y]: Times recorded N...N days/weeks/months/years ago
100
101 Important: all date comparisons are made on datetime objects, using
102 00:00 as the time (first second of the given day). This means that
103 for range filters, the first day is included, but the second day is not
104 """
105 filter_from = None
106 filter_to = None
107
108 if stamp_filter is None:
109 return filter_from, filter_to
110
111 if '--' in stamp_filter:
112 filter_from, filter_to = stamp_filter.split('--')
113 filter_from = datetime.strptime(filter_from, DATE_FORMAT)
114 filter_to = datetime.strptime(filter_to, DATE_FORMAT)
115
116 elif stamp_filter.startswith('*'):
117 filter_to = datetime.strptime(stamp_filter, '*'+DATE_FORMAT)
118 filter_to = filter_to.replace(hour=0, minute=0, second=0)
119
120 elif stamp_filter.endswith('*'):
121 filter_from = datetime.strptime(stamp_filter, DATE_FORMAT+'*')
122 filter_from = filter_from.replace(hour=0, minute=0, second=0)
123
124 elif re.search(r'(\d+[dD]{1})', stamp_filter):
125 number = int(stamp_filter.lower().replace('d', ''))
126 delta = timedelta(days=number)
127 filter_from = datetime.today() - delta
128 filter_from = filter_from.replace(hour=0, minute=0, second=0)
129
130 elif re.search(r'(\d+[wW]{1})', stamp_filter):
131 number = int(stamp_filter.lower().replace('w', '')) * 7
132 delta = timedelta(days=number)
133 filter_from = datetime.today() - delta
134 filter_from = filter_from.replace(hour=0, minute=0, second=0)
135
136 elif re.search(r'(\d+[mM]{1})', stamp_filter):
137 number = int(stamp_filter.lower().replace('m', ''))
138 past = date.today()
139 # start travelling in time, back to N months ago
140 for n in range(number):
141 past = past.replace(day=1) - timedelta(days=1)
142 # Now use the year/month from the past + the current day to set
143 # the proper date
144 filter_from = datetime(past.year, past.month, date.today().day)
145
146 elif re.search(r'(\d+[yY]{1})', stamp_filter):
147 number = int(stamp_filter.lower().replace('y', ''))
148 today = date.today()
149 filter_from = datetime(today.year - number, today.month, today.day)
150
151 else:
152 # maybe they are giving us a fixed date
153 try:
154 filter_from = datetime.strptime(stamp_filter, DATE_FORMAT)
155 except:
156 # nothing to be used as a filter, go on, printing a warning
157 print('[warning] invalid date filter: ' + stamp_filter)
158 else:
159 filter_from = filter_from.replace(hour=0, minute=0, second=0)
160 filter_to = filter_from + timedelta(days=1)
161
162 return filter_from, filter_to
163
164 @property
165 def customers(self):
166 customers = []
167 for stamp in self.stamps:
168 if stamp['customer'] not in customers:
169 customers.append(stamp['customer'])
170 customers.remove(None)
171 return customers
172
173 def totals(self, filter_from=None, filter_to=None):
174 totals = {}
175 for stamp in self.stamps:
176 customer = stamp['customer']
177 # customer will be None for "start" stamps, having no end time
178 if customer:
179 start = datetime.strptime(stamp['start'], DATETIME_FORMAT)
180 end = datetime.strptime(stamp['end'], DATETIME_FORMAT)
181 if filter_from and start < filter_from:
182 # if there is a filter setting a starting date for the
183 # report and the current stamp is from an earlier date, do
184 # not add it to the totals
185 continue
186 if filter_to and start > filter_to:
187 # similar for the end date
188 continue
189 if customer not in totals:
190 totals[customer] = 0
191 totals[customer] += self.worktime(stamp['start'], stamp['end'])
192 return totals
193
194 def details(self, filter_customer=None, filter_from=None, filter_to=None):
195 details = OrderedDict()
196 totals = OrderedDict()
197 total_customer = OrderedDict()
198 for stamp in self.stamps:
199 customer = stamp['customer']
200 if customer:
201 if filter_customer and customer != filter_customer:
202 # we are getting the details for only one customer, if this
203 # stamp is not for that customer, simply move on and ignore
204 # it
205 continue
206 start = datetime.strptime(stamp['start'], DATETIME_FORMAT)
207 start_day = start.strftime('%Y-%m-%d')
208 end = datetime.strptime(stamp['end'], DATETIME_FORMAT)
209 if filter_from and start < filter_from:
210 # if there is a filter setting a starting date for the
211 # report and the current stamp is from an earlier date, do
212 # not add it to the totals
213 continue
214 if filter_to and start > filter_to:
215 # similar for the end date
216 continue
217 # avoid "start" stamps
218 if start_day not in details:
219 details[start_day] = []
220 if start_day not in totals:
221 totals[start_day] = 0
222 worktime = self.worktime(stamp['start'], stamp['end'])
223 details[start_day].append(
224 ' -> %(worktime)s %(customer)s %(action)s' % {
225 'worktime': str(timedelta(seconds=worktime)),
226 'customer': customer,
227 'action': stamp['action']
228 })
229 totals[start_day] += worktime
230 if start_day not in total_customer:
231 total_customer[start_day] = {}
232 if customer not in total_customer[start_day]:
233 total_customer[start_day][customer] = 0
234 total_customer[start_day][customer] += worktime
235 for day in totals:
236 totals[day] = str(timedelta(seconds=totals[day]))
237 return details, totals, total_customer
238
239 def timeline(self):
240 for stamp in self.stamps:
241 customer = stamp['customer']
242 if not stamp['customer']:
243 print(stamp['start'] + ' start')
244 else:
245 print(' '.join([stamp['end'],
246 stamp['customer'],
247 stamp['action']]))
248
249 def graph_stamps(self, customer=None, stamp_filter=None):
250 """
251 Generate charts with information from the stamps
252 """
253 filter_from, filter_to = self.validate_filter(stamp_filter)
254 chart = pygal.Bar(title='Work hours per day',
255 range=(0, HOURS_DAY),
256 x_title='Days',
257 y_title='Work hours',
258 x_label_rotation=45)
259
260 details, totals, totals_customers = self.details(customer,
261 filter_from,
262 filter_to)
263 days = []
264 values = {}
265 for c in self.customers:
266 values[c] = []
267
268 found = []
269
270 for day in details:
271 for c in values:
272 seconds = totals_customers[day].get(c, 0)
273 if seconds and c not in found:
274 found.append(c)
275 human = timedelta(seconds=seconds).__str__()
276 values[c].append({'value': seconds/60.00/60.00,
277 'label': day + ': ' + human})
278 days.append(day)
279 chart.x_labels = map(str, days)
280
281 if customer:
282 chart.add(customer, values[customer])
283 else:
284 for c in found:
285 chart.add(c, values[c])
286
287 chart_name = 'chart-%s.svg' % datetime.today().strftime(
288 '%Y-%m-%d_%H%M%S')
289 chart_symlink = 'chart-latest.svg'
290 chart.render_to_file('graphs/' + chart_name)
291 if islink('graphs/'+ chart_symlink):
292 remove('graphs/'+ chart_symlink)
293 symlink(chart_name, 'graphs/'+ chart_symlink)
294
295 def show_stamps(self, customer=None, stamp_filter=None, verbose=False,
296 sum=False):
297 filter_from, filter_to = self.validate_filter(stamp_filter)
298
299 # If the user asks for verbose information, show it before the
300 # totals (mimicing what the original stamp tool does)
301 if verbose:
302 details, totals, total_customer = self.details(customer,
303 filter_from,
304 filter_to)
305 for day in details:
306 print('------ %(day)s ------' % {'day': day})
307 for line in details[day]:
308 print(line)
309 print(' Total: %(total)s' % {'total': totals[day]})
310 print '-'*79
311
312 # now calculate the totals and show them
313 totals = self.totals(filter_from, filter_to)
314 if customer:
315 seconds=totals.get(customer, 0)
316 total = timedelta(seconds=totals.get(customer, 0))
317 print(' %(customer)s: %(total)s' % {'customer': customer,
318 'total': total})
319 else:
320 for c in totals:
321 seconds=totals[c]
322 total = timedelta(seconds=totals[c])
323 print(' %(customer)s: %(total)s' % {'customer': c,
324 'total': total})
325
326 if sum:
327 sum_tot = ''
328 if totals:
329 print('------ Totals ------' % {'day': day})
330 for day, tot in totals.iteritems():
331 print(' %(day)s: %(total)s' % {'day': day, 'total': tot})
332 sum_tot = "%(total)s %(new)s" % {
333 'total': sum_tot,
334 'new': total
335 }
336 totalSecs, sec = divmod(seconds, 60)
337 hr, min = divmod(totalSecs, 60)
338 totalDays, remaining = divmod(seconds, SECS_DAY)
339 remainingMin, remainingSec = divmod(remaining, (60))
340 remainingHr, remainingMin = divmod(remainingMin, (60))
341 print('----- %d:%02d:%02d -----' % (hr, min, sec))
342 print('--- %d days, remaining: %d:%02d (%d hours/day) ---' % (
343 totalDays, remainingHr, remainingMin, HOURS_DAY
344 ))
345
346 def remove_stamps(self, n=1):
347 """
348 Remove up to n stamps back, asking for confirmation before delete
349 """
350 for i in range(n):
351 stamp = self.last_stamp()
352 if not stamp['customer']:
353 print(stamp['start'] + ' start')
354 else:
355 print(' '.join([stamp['end'],
356 stamp['customer'],
357 stamp['action']]))
358 confirm = ''
359 while confirm.lower() not in ['y', 'n']:
360 confirm = raw_input('delete stamp? (y/n) ')
361 confirm = confirm.lower()
362 if confirm == 'y':
363 self.stamps.pop()
364 else:
365 # if the user says no to the removal of an stamp, we cannot
366 # keep deleting stamps after that one, as that could leave the
367 # stamps in an inconsistent state.
368 print('Aborting removal of stamps')
369 break
370 self.save_stamps()
371
372 def import_stamps(self, filename):
373 """
374 Import the stamps from the given file into the main stamps list,
375 merging them into the list (removing duplicated entries)
376 """
377 if not exists(filename):
378 print('[error] ' + filename + 'does not exist')
379 return
380 if isdir(filename):
381 print('[error] ' + filename + 'is a directory')
382 return
383 stamps = self.__json_load(filename)
384 if not stamps:
385 print('[warning] no stamps can be imported from ' + filename)
386 return
387 self.stamps.extend(stamps)
388 self.remove_duplicates()
389 self.sort_stamps()
390 self.save_stamps()
391 print('[warning] ' + str(len(stamps)) + ' stamps merged')
392 print('[warning] remember to review the resulting stamps file')
Note: See TracBrowser for help on using the repository browser.