-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathScheduled monthly.py
141 lines (133 loc) · 4.99 KB
/
Scheduled monthly.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
inputs:
- message_id:
type: str
required: False
doc: |
The UUID of the message form the user was asked to fill in.
If unset, the flow will create a message form.
- flow_name:
type: str
required: False
doc: the name of the flow script to be scheduled
"""
import datetime
import dateutil.relativedelta
def handler(system, this):
"""
Create a message form to ask for details and schedule a flow
"""
inputs = this.get('input_value') or {}
message_id = inputs.get('message_id')
if message_id is None:
defaults = {
'scheduled_at_day': 1,
'scheduled_at_time': '08:30',
}
try:
defaults['flow_name'] = inputs['flow_name']
except KeyError:
pass
message = system.message(
subject='Monthly scheduled execution',
body={
'type': 'object',
'properties': {
'flow_name': {
'label': 'Name of the flow which should be scheduled monthly',
'element': 'string',
'type': 'string',
'example': defaults.get('flow_name'),
'default': defaults.get('flow_name'),
'order': 1,
},
'scheduled_at_day': {
'label': 'Day of the month when the child execution should be started',
'element': 'number',
'type': 'number',
'default': defaults['scheduled_at_day'],
'order': 2,
},
'scheduled_at_time': {
'label': 'Time when the child execution should be started',
'element': 'time',
'type': 'string',
'format': 'time',
'default': defaults['scheduled_at_time'],
'order': 3,
},
'max_iterations': {
'label': 'Maximum number of iterations (unlimited if omitted)',
'element': 'number',
'type': 'number',
'order': 4,
},
'start': {
'label': 'Start monthly schedule',
'element': 'submit',
'order': 5,
},
},
'required': [
'flow_name',
'scheduled_at_day',
'scheduled_at_time',
],
},
)
message_id = message.get('id')
this.save(output_value={
'message_id': message_id,
})
this.flow(
'Scheduled monthly',
name='Monthly scheduled execution',
message_id=message_id,
wait=False,
)
return this.success('requested details')
message = system.message(message_id)
response = message.wait().get('response')
this.log(response=response)
flow_name = response['flow_name']
scheduled_at_day = response['scheduled_at_day']
scheduled_at_time = response['scheduled_at_time']
max_iterations = response.get('max_iterations')
this.save(name=f'Scheduled {flow_name}')
scheduled_at_time_t = datetime.datetime.strptime(scheduled_at_time, '%H:%M:%S%z').timetz()
this.log(scheduled_at_time_t=scheduled_at_time_t)
iterations = 0
start = datetime.datetime.now(datetime.timezone.utc).timestamp()
while max_iterations is None or iterations < max_iterations:
now = datetime.datetime.now(datetime.timezone.utc)
this.log(now=now)
today = now.date()
this.log(today=today, day=today.day)
scheduled_at_day_t = today.replace(day=scheduled_at_day)
scheduled_t = datetime.datetime.combine(scheduled_at_day_t, scheduled_at_time_t)
if scheduled_t < now:
scheduled_t += dateutil.relativedelta.relativedelta(months=1)
this.log(scheduled_t=scheduled_t)
scheduled_ts = scheduled_t.isoformat(sep=' ', timespec='minutes')
this.log(scheduled_ts=scheduled_ts)
delta_sec = (scheduled_t - now).total_seconds()
this.log(delta_sec=delta_sec)
this.save(message=scheduled_ts)
this.sleep(delta_sec)
iterations += 1
this.save(message=f'iteration {iterations}/{max_iterations}')
# Start child execution
inputs = {
'start': start,
'iterations': iterations,
'max_iterations': max_iterations,
}
this.flow(
flow_name,
inputs=inputs,
name=f'{flow_name} iteration #{iterations}',
wait=False,
)
if max_iterations is not None and iterations >= max_iterations:
break
return this.success(f'started {iterations} iterations')