@@ -8,11 +8,14 @@ use super::DataStore;
8
8
use crate :: context:: OpContext ;
9
9
use crate :: db:: error:: public_error_from_diesel;
10
10
use crate :: db:: error:: ErrorHandler ;
11
+ use crate :: db:: model:: SqlU8 ;
11
12
use crate :: db:: model:: WebhookDelivery ;
12
13
use crate :: db:: model:: WebhookDeliveryAttempt ;
13
14
use crate :: db:: schema:: webhook_delivery:: dsl;
15
+ use crate :: db:: schema:: webhook_delivery_attempt:: dsl as attempt_dsl;
14
16
use crate :: db:: schema:: webhook_event:: dsl as event_dsl;
15
17
use crate :: db:: update_and_check:: UpdateAndCheck ;
18
+ use crate :: db:: update_and_check:: UpdateAndQueryResult ;
16
19
use crate :: db:: update_and_check:: UpdateStatus ;
17
20
use async_bb8_diesel:: AsyncRunQueryDsl ;
18
21
use chrono:: TimeDelta ;
@@ -43,13 +46,13 @@ impl DataStore {
43
46
. filter ( dsl:: time_completed. is_null ( ) )
44
47
. filter ( dsl:: rx_id. eq ( rx_id. into_untyped_uuid ( ) ) )
45
48
. filter (
46
- ( dsl:: time_delivery_started
47
- . is_null ( )
48
- . and ( dsl :: deliverator_id . is_null ( ) ) )
49
- . or ( dsl:: time_delivery_started. is_not_null ( ) . and (
50
- dsl :: time_delivery_started
51
- . le ( now . nullable ( ) - lease_timeout ) ,
52
- ) ) ,
49
+ ( dsl:: deliverator_id . is_null ( ) ) . or ( dsl :: time_delivery_started
50
+ . is_not_null ( )
51
+ . and (
52
+ dsl:: time_delivery_started
53
+ . le ( now . nullable ( ) - lease_timeout ) ,
54
+ ) ) ,
55
+ // TODO(eliza): retry backoffs...?
53
56
)
54
57
. order_by ( dsl:: time_created. asc ( ) )
55
58
// Join with the `webhook_event` table to get the event class, which
@@ -78,13 +81,12 @@ impl DataStore {
78
81
. filter ( dsl:: time_completed. is_null ( ) )
79
82
. filter ( dsl:: id. eq ( id) )
80
83
. filter (
81
- ( dsl:: time_delivery_started
82
- . is_null ( )
83
- . and ( dsl:: deliverator_id. is_null ( ) ) )
84
- . or ( dsl:: time_delivery_started. is_not_null ( ) . and (
85
- dsl:: time_delivery_started
86
- . le ( now. nullable ( ) - lease_timeout) ,
87
- ) ) ,
84
+ dsl:: deliverator_id. is_null ( ) . or ( dsl:: time_delivery_started
85
+ . is_not_null ( )
86
+ . and (
87
+ dsl:: time_delivery_started
88
+ . le ( now. nullable ( ) - lease_timeout) ,
89
+ ) ) ,
88
90
)
89
91
. set ( (
90
92
dsl:: time_delivery_started. eq ( now. nullable ( ) ) ,
@@ -128,8 +130,79 @@ impl DataStore {
128
130
opctx : & OpContext ,
129
131
delivery : & WebhookDelivery ,
130
132
nexus_id : & OmicronZoneUuid ,
131
- result : & WebhookDeliveryAttempt ,
133
+ attempt : & WebhookDeliveryAttempt ,
132
134
) -> Result < ( ) , Error > {
133
- Err ( Error :: internal_error ( "TODO ELIZA DO THIS PART" ) )
135
+ const MAX_ATTEMPTS : u8 = 4 ;
136
+ let conn = self . pool_connection_authorized ( opctx) . await ?;
137
+ diesel:: insert_into ( attempt_dsl:: webhook_delivery_attempt)
138
+ . values ( attempt. clone ( ) )
139
+ . on_conflict ( ( attempt_dsl:: delivery_id, attempt_dsl:: attempt) )
140
+ . do_nothing ( )
141
+ . returning ( WebhookDeliveryAttempt :: as_returning ( ) )
142
+ . execute_async ( & * conn)
143
+ . await
144
+ . map_err ( |e| public_error_from_diesel ( e, ErrorHandler :: Server ) ) ?;
145
+
146
+ // Has the delivery either completed successfully or exhausted all of
147
+ // its retry attempts?
148
+ let succeeded =
149
+ attempt. result == nexus_db_model:: WebhookDeliveryResult :: Succeeded ;
150
+ let failed_permanently = * attempt. attempt >= MAX_ATTEMPTS ;
151
+ let ( completed, new_nexus_id) = if succeeded || failed_permanently {
152
+ // If the delivery has succeeded or failed permanently, set the
153
+ // "time_completed" timestamp to mark it as finished. Also, leave
154
+ // the delivering Nexus ID in place to maintain a record of who
155
+ // finished the delivery.
156
+ ( Some ( Utc :: now ( ) ) , Some ( nexus_id. into_untyped_uuid ( ) ) )
157
+ } else {
158
+ // Otherwise, "unlock" the delivery for other nexii.
159
+ ( None , None )
160
+ } ;
161
+ let prev_attempts = SqlU8 :: new ( ( * attempt. attempt ) - 1 ) ;
162
+ let UpdateAndQueryResult { status, found } =
163
+ diesel:: update ( dsl:: webhook_delivery)
164
+ . filter ( dsl:: id. eq ( delivery. id . into_untyped_uuid ( ) ) )
165
+ . filter ( dsl:: deliverator_id. eq ( nexus_id. into_untyped_uuid ( ) ) )
166
+ . filter ( dsl:: attempts. eq ( prev_attempts) )
167
+ // Don't mark a delivery as completed if it's already completed!
168
+ . filter ( dsl:: time_completed. is_null ( ) )
169
+ . set ( (
170
+ dsl:: time_completed. eq ( completed) ,
171
+ // XXX(eliza): hmm this might be racy; we should probably increment this
172
+ // in place and use it to determine the attempt number?
173
+ dsl:: attempts. eq ( attempt. attempt ) ,
174
+ dsl:: deliverator_id. eq ( new_nexus_id) ,
175
+ ) )
176
+ . check_if_exists :: < WebhookDelivery > ( delivery. id )
177
+ . execute_and_check ( & conn)
178
+ . await
179
+ . map_err ( |e| {
180
+ public_error_from_diesel ( e, ErrorHandler :: Server )
181
+ } ) ?;
182
+
183
+ if status == UpdateStatus :: Updated {
184
+ return Ok ( ( ) ) ;
185
+ }
186
+
187
+ if let Some ( other_nexus_id) = found. deliverator_id {
188
+ return Err ( Error :: conflict ( format ! (
189
+ "cannot mark delivery completed, as {other_nexus_id:?} was \
190
+ attempting to deliver it",
191
+ ) ) ) ;
192
+ }
193
+
194
+ if found. time_completed . is_some ( ) {
195
+ return Err ( Error :: conflict (
196
+ "delivery was already marked as completed" ,
197
+ ) ) ;
198
+ }
199
+
200
+ if found. attempts != prev_attempts {
201
+ return Err ( Error :: conflict ( "wrong number of delivery attempts" ) ) ;
202
+ }
203
+
204
+ Err ( Error :: internal_error (
205
+ "couldn't update delivery for some other reason i didn't think of here..."
206
+ ) )
134
207
}
135
208
}
0 commit comments