Skip to content

Commit

Permalink
Fix pipe recreation when restoring to smaller cluster (#50)
Browse files Browse the repository at this point in the history
Restoring the same table to smaller cluster is logically divided to batches.
Each batch (one or more) represents a segment's data from larger cluster. The
writer (gprestore helper) writes batches to pipe while the reader (COPY command
initiated by gprestore) reads and applies all data from it. If reader is fast
enough, there can be a race between it and writer. For the next batch, reader
can open flushed, but not yet deleted pipe. This cause writer to wait endlessly
on ENXIO loop and reader to wait on deleted pipe. The solution is to recreate
pipe before flushing. The reader still able to read from such unlinked pipe and
the next reader loop will open a new pipe with the same name.

I spent much time trying to find an explanation of "data holdover bug" mentioned
in pipe recreation comment. To save our time in the future, I decided to update
a comment to clear this problem too. The comment also highlights general
suboptimality of current solution with a plan of future refactoring.

I found no easy way to automate testing of this race condition. To prove a bug,
I advice to add a sleep after flush on non-patched version. Performing a restore
from larger to smaller after this will cause the restore process to hang.
  • Loading branch information
Alexey Gordeev authored Dec 8, 2023
1 parent 89fe316 commit 9ef2f51
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions helper/restore_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,22 @@ func doRestoreAgent() error {
log(fmt.Sprintf("Oid %d: Copied %d bytes into the pipe", oid, bytesRead))

log(fmt.Sprintf("Closing pipe for oid %d: %s", oid, currentPipe))
err = flushAndCloseRestoreWriter(currentPipe, oid)
if err != nil {
log(fmt.Sprintf("Oid %d: Failed to flush and close pipe", oid))
goto LoopEnd
}

// Recreate pipe to prevent data holdover bug
// The mentioned bug is probably caused by unsynchronized reader
// and writer. When reader is slow enough, writer can append
// another batch of data to the same pipe in the next loop cycle.
// In the worst case, reader can process all data in one loop
// cycle for the first batch.
// Doing recreation before flush is essential. Otherwise, fast
// enough reader can open the pipe which is already flushed, but
// not yet deleted. This race causing writer to wait endlessly in
// next ENXIO cycle above and the reader to wait for data on the
// other side of pipe.
// TODO: The whole batches loop looks overweighted. It seems that
// we can simplify and speedup it by using a single writer with
// single pipe. Then the problem described will cease to be
// relevant.
err = deletePipe(currentPipe)
if err != nil {
log(fmt.Sprintf("Error deleting pipe %s at end of batch loop: %v", currentPipe, err))
Expand All @@ -345,6 +354,12 @@ func doRestoreAgent() error {
goto LoopEnd
}

err = flushAndCloseRestoreWriter(currentPipe, oid)
if err != nil {
log(fmt.Sprintf("Oid %d: Failed to flush and close pipe", oid))
goto LoopEnd
}

contentToRestore += *destSize
}
log(fmt.Sprintf("Oid %d: Successfully flushed and closed pipe", oid))
Expand Down

0 comments on commit 9ef2f51

Please sign in to comment.