Skip to content

Commit

Permalink
Never expect the replica iterators to be sorted (benbjohnson#538)
Browse files Browse the repository at this point in the history
  • Loading branch information
hifi authored Dec 18, 2023
1 parent 1af88c4 commit 0a7f6e9
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 17 deletions.
3 changes: 3 additions & 0 deletions cmd/litestream/generations.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"os"
"sort"
"text/tabwriter"
"time"

Expand Down Expand Up @@ -90,6 +91,8 @@ func (c *GenerationsCommand) Run(ctx context.Context, args []string) (err error)
continue
}

sort.Strings(generations)

// Iterate over each generation for the replica.
for _, generation := range generations {
createdAt, updatedAt, err := r.GenerationTimeBounds(ctx, generation)
Expand Down
5 changes: 0 additions & 5 deletions file/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"io"
"os"
"path/filepath"
"sort"

"github.com/benbjohnson/litestream"
"github.com/benbjohnson/litestream/internal"
Expand Down Expand Up @@ -180,8 +179,6 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (lites
})
}

sort.Sort(litestream.SnapshotInfoSlice(infos))

return litestream.NewSnapshotInfoSliceIterator(infos), nil
}

Expand Down Expand Up @@ -297,8 +294,6 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (lit
})
}

sort.Sort(litestream.WALSegmentInfoSlice(infos))

return litestream.NewWALSegmentInfoSliceIterator(infos), nil
}

Expand Down
9 changes: 7 additions & 2 deletions replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,10 +1302,15 @@ func (r *Replica) walSegmentMap(ctx context.Context, generation string, minIndex
}
defer itr.Close()

m := make(map[int][]int64)
a := []WALSegmentInfo{}
for itr.Next() {
info := itr.WALSegment()
a = append(a, itr.WALSegment())
}

sort.Sort(WALSegmentInfoSlice(a))

m := make(map[int][]int64)
for _, info := range a {
// Exit if we go past the max timestamp or index.
if !maxTimestamp.IsZero() && info.CreatedAt.After(maxTimestamp) {
break // after max timestamp, skip
Expand Down
6 changes: 3 additions & 3 deletions replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ type ReplicaClient interface {
// Returns the type of client.
Type() string

// Returns a list of available generations.
// Returns a list of available generations. Order is undefined.
Generations(ctx context.Context) ([]string, error)

// Deletes all snapshots & WAL segments within a generation.
DeleteGeneration(ctx context.Context, generation string) error

// Returns an iterator of all snapshots within a generation on the replica.
// Returns an iterator of all snapshots within a generation on the replica. Order is undefined.
Snapshots(ctx context.Context, generation string) (SnapshotIterator, error)

// Writes LZ4 compressed snapshot data to the replica at a given index
Expand All @@ -31,7 +31,7 @@ type ReplicaClient interface {
// the snapshot does not exist.
SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error)

// Returns an iterator of all WAL segments within a generation on the replica.
// Returns an iterator of all WAL segments within a generation on the replica. Order is undefined.
WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error)

// Writes an LZ4 compressed WAL segment at a given position.
Expand Down
7 changes: 0 additions & 7 deletions sftp/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net"
"os"
"path"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -141,8 +140,6 @@ func (c *ReplicaClient) Generations(ctx context.Context) (_ []string, err error)
generations = append(generations, name)
}

sort.Strings(generations)

return generations, nil
}

Expand Down Expand Up @@ -229,8 +226,6 @@ func (c *ReplicaClient) Snapshots(ctx context.Context, generation string) (_ lit
})
}

sort.Sort(litestream.SnapshotInfoSlice(infos))

return litestream.NewSnapshotInfoSliceIterator(infos), nil
}

Expand Down Expand Up @@ -363,8 +358,6 @@ func (c *ReplicaClient) WALSegments(ctx context.Context, generation string) (_ l
})
}

sort.Sort(litestream.WALSegmentInfoSlice(infos))

return litestream.NewWALSegmentInfoSliceIterator(infos), nil
}

Expand Down

0 comments on commit 0a7f6e9

Please sign in to comment.