-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjobs.go
257 lines (219 loc) · 7.43 KB
/
jobs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
package main
import (
"bytes"
"fmt"
"io"
"mime/multipart"
"net/http"
"path"
"strings"
"time"
"github.com/google/uuid"
)
func newJob(r *http.Request, w http.ResponseWriter, logger *customLogger) (err error) {
jobID := uuid.New().String()
// Check if client has broken redirect behavior, like the android app.
// Redirect support is necessary as file processing can take a long time and the client risks a timeout on the http request.
// Currently no way to check for redirect support so blacklist Dart/ user agent from the android app.
// Ideally redirect support is added here: https://github.com/immich-app/immich/blob/f6cbc9db06c0783d09f154f66e12d041032fff62/cli/src/commands/asset.ts#L290
clientFollowsRedirects := !strings.HasPrefix(r.UserAgent(), "Dart/")
if !clientFollowsRedirects {
logger = newCustomLogger(logger, "client with broken redirects: ")
}
jobLogger := newCustomLogger(logger, fmt.Sprintf("job %s: ", jobID))
jobLogger.Printf("intercepting upload")
formFile, formFileHeader, err := r.FormFile(filterFormKey)
if err != nil {
err = fmt.Errorf("unable to read file in key %s from uploaded form data: %w", filterFormKey, err)
return
}
defer formFile.Close()
jobLogger.Printf("uploaded %s %s", formFileHeader.Filename, humanReadableSize(formFileHeader.Size))
// Create the channels for this job
jobChannels[jobID] = make(chan *http.Response)
jobChannelsComplete[jobID] = make(chan struct{})
cleanup1 := func() {
close(jobChannels[jobID])
delete(jobChannels, jobID)
close(jobChannelsComplete[jobID])
delete(jobChannelsComplete, jobID)
}
// Redirect the user to the job wait page
if clientFollowsRedirects {
http.Redirect(w, r, fmt.Sprintf("/_immich-upload-optimizer/wait?job=%s", jobID), http.StatusTemporaryRedirect)
w.(http.Flusher).Flush()
}
// Continue processing the file
tp, err := NewTaskProcessorFromMultipart(formFile, formFileHeader)
if err != nil {
defer cleanup1()
err = fmt.Errorf("unable to create task processor: %w", err)
if !clientFollowsRedirects {
http.Error(w, "failed to process file, view logs for more info", http.StatusInternalServerError)
}
return
}
tp.SetLogger(jobLogger)
cleanup2 := func() {
tp.Close()
cleanup1()
}
err = tp.Process(config.Tasks)
if err != nil {
defer cleanup2()
err = fmt.Errorf("failed to process file in job %s: %v", jobID, err.Error())
if !clientFollowsRedirects {
http.Error(w, "failed to process file, view logs for more info", http.StatusInternalServerError)
}
return
}
replace := tp.OriginalSize > tp.ProcessedSize
// Create the form data to be sent upstream
var buffer bytes.Buffer
writer := multipart.NewWriter(&buffer)
for key, values := range r.MultipartForm.Value {
for _, value := range values {
err = writer.WriteField(key, value)
if err != nil {
defer cleanup2()
err = fmt.Errorf("unable to create form data to be sent upstream: %w", err)
return
}
}
}
uploadFilename := tp.OriginalFilename
uploadFile := tp.OriginalFile
if replace {
uploadFilename = tp.ProcessedFilename
uploadFile = tp.ProcessedFile
}
part, err := writer.CreateFormFile(filterFormKey, uploadFilename)
if err != nil {
defer cleanup2()
err = fmt.Errorf("unable to create file form field to be sent upstream: %w", err)
return
}
_, err = tp.OriginalFile.Seek(0, io.SeekStart)
if err != nil {
defer cleanup2()
err = fmt.Errorf("unable to seek beginning of temp file: %w", err)
return
}
_, err = io.Copy(part, uploadFile)
if err != nil {
defer cleanup2()
err = fmt.Errorf("unable to write file in form field to be sent upstream: %w", err)
return
}
err = writer.Close()
if err != nil {
defer cleanup2()
err = fmt.Errorf("unable to finish form data to be sent upstream: %w", err)
return
}
// Send the request to the upstream server
destination := *remote
destination.Path = path.Join(destination.Path, r.URL.Path)
req, err := http.NewRequest("POST", destination.String(), &buffer)
if err != nil {
defer cleanup2()
err = fmt.Errorf("unable to create POST request to upstream: %w", err)
return
}
req.Header.Set("Content-Type", writer.FormDataContentType())
for key, values := range r.Header {
for _, value := range values {
req.Header.Add(key, value)
}
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
defer cleanup2()
err = fmt.Errorf("unable to POST to upstream: %w", err)
return
}
//defer resp.Body.Close() // Done bellow to allow original http request to be closed first
// Log the result
action := "file NOT replaced"
if replace {
action = "file replaced"
}
jobLogger.Printf("%s: \"%s\" %s optimized to \"%s\" %s", action, tp.OriginalFilename, humanReadableSize(tp.OriginalSize), tp.ProcessedFilename, humanReadableSize(tp.ProcessedSize))
cleanup3 := func() {
resp.Body.Close()
cleanup2()
}
if !clientFollowsRedirects {
defer cleanup3()
w.WriteHeader(resp.StatusCode)
_, err = io.Copy(w, resp.Body)
if err != nil {
err = fmt.Errorf("unable to forward response back to client directly: %v", err)
} else {
jobLogger.Printf("response sent back to client directly")
}
return
}
// Allow the function to return so the client request ends
go func() {
defer cleanup3()
// Send the response back to the client via the wait page
select {
case jobChannels[jobID] <- resp:
// Wait for the response to be sent to the client before cleaning up or timeout.
// This is to avoid all the deferred functions to run before the response is fully sent.
select {
case <-jobChannelsComplete[jobID]:
jobLogger.Printf("response sent to client")
case <-time.After(10 * time.Second):
jobLogger.Printf("timeout before response was fully sent to client")
}
case <-time.After(10 * time.Second):
jobLogger.Printf("timeout while waiting for client to ask for a response on the redirect wait page, redirect was not followed by the client.")
}
}()
return nil
}
func continueJob(r *http.Request, w http.ResponseWriter, requestLogger *customLogger) {
jobID := r.URL.Query().Get("job")
jobChannel, exists := jobChannels[jobID]
if jobID == "" || !exists {
http.Error(w, "job not found", http.StatusBadRequest)
return
}
jobLogger := newCustomLogger(requestLogger, fmt.Sprintf("job %s: ", jobID))
// Parse the form data again as not to leave the POST hanging, some proxies like cloudflare will error without this.
err := r.ParseMultipartForm(10 << 20) // store up to 10 MB in memory to prevent disk writes
if err != nil {
// ignore as we already have the data
}
// 55s to avoid browser timeout
safeClientTimeout := time.Duration(55) * time.Second
select {
case resp, ok := <-jobChannel:
if !ok {
msg := "job channel closed unexpectedly"
http.Error(w, msg, http.StatusInternalServerError)
requestLogger.Printf(msg)
return
}
// @TODO
// It prevents cookie headers from being forwarded, potentially leaking them,
// so when done a job should only match if it belongs to the corresponding session.
// for key, values := range resp.Header {
// for _, value := range values {
// w.Header().Add(key, value)
// }
// }
w.WriteHeader(resp.StatusCode)
_, err := io.Copy(w, resp.Body)
if err != nil {
jobLogger.Printf("unable to forward response back to client: %v", err)
}
jobChannelsComplete[jobID] <- struct{}{}
case <-time.After(safeClientTimeout):
http.Redirect(w, r, r.URL.String(), http.StatusTemporaryRedirect)
jobLogger.Printf("still running, sending redirect to avoid client timeout: %s", r.URL)
}
}