diff --git a/source-postgres/replication.go b/source-postgres/replication.go index 99c006ec9b..7dda76d688 100644 --- a/source-postgres/replication.go +++ b/source-postgres/replication.go @@ -62,15 +62,12 @@ func (db *postgresDatabase) ReplicationStream(ctx context.Context, startCursor s logrus.WithField("err", err).Debug("error recreating replication slot") } - // Obtain the current WAL flush location via an IDENTIFY_SYSTEM command. - // Note: We use IDENTIFY_SYSTEM here for legacy reasons, it might be cleaner - // to get this information from the newly-recreated slot's confirmed_flush_lsn - // now that we do it that way. - var sysident, err = pglogrepl.IdentifySystem(ctx, conn) + // Obtain the current WAL flush location on the server and initialize the cursor to that point. + var flushLSN, err = queryLatestServerLSN(ctx, db.conn) if err != nil { - return nil, fmt.Errorf("unable to read WAL flush LSN from database: %w", err) + return nil, fmt.Errorf("unable to initialize cursor to current server LSN: %w", err) } - startLSN = sysident.XLogPos + startLSN = flushLSN } // Check that the slot's `confirmed_flush_lsn` is less than or equal to our resume cursor value.