-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathLibScheduler.py
More file actions
executable file
·78 lines (61 loc) · 2.02 KB
/
LibScheduler.py
File metadata and controls
executable file
·78 lines (61 loc) · 2.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import threading
import time
import sys
MAIN_TICK_INTERVAL_SECS = 1
class TestWorkItem:
def GetIntervalSeconds(_):
return 60
def Execute(a):
print ("Running")
return False
class FastWorkItem:
def GetIntervalSeconds(_):
return 5
def Execute(a):
print ("Running fast")
time.sleep(10)
return False
class ScheduledItem:
""" Reprisents task that has been sceduled to run """
def __init__(self, item):
self.Item = item
self.LastExecuted = 0
self._lock = threading.Lock()
self._isExecuting = False
def CanExecute(self):
""" Checks if the task can be executed at this time """
with self._lock:
return not self._isExecuting
def Execute(self):
""" Executes the task """
with self._lock:
if self._isExecuting:
raise Exception("Cannot execute this item. CanExecute() == False")
self._isExecuting = True
try:
self.Item.Execute()
except:
e = sys.exc_info()
print("Task threw exception during execution - " + str(e))
with self._lock:
self._isExecuting = False
class Scheduler:
_items = []
def RegisterWorkItem(self, item):
self._items.append(ScheduledItem(item))
def Run(self):
x = threading.Thread(target=self._RunInternal)
x.start()
def _RunInternal(self):
while (True):
for item in self._items:
now = time.time()
diff = now - item.LastExecuted
if (now - item.LastExecuted) >= item.Item.GetIntervalSeconds():
if (item.CanExecute()):
t = threading.Thread(target=item.Execute)
t.start()
item.LastExecuted = now
else:
print("Item is due to execute, but was skipped because it was not ready")
time.sleep(MAIN_TICK_INTERVAL_SECS)