Skip to content

Commit 1ab7d9d

Browse files
authored
Merge pull request #49 from fly-apps/fence
Fencing a failed primary
2 parents a3a90ee + fdc2d32 commit 1ab7d9d

File tree

9 files changed

+768
-63
lines changed

9 files changed

+768
-63
lines changed

cmd/event_handler/main.go

+71-10
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
"os"
78
"strconv"
9+
"time"
810

911
"github.com/fly-apps/postgres-flex/pkg/flypg"
1012
)
@@ -21,27 +23,87 @@ func main() {
2123
details := flag.String("details", "", "details")
2224
flag.Parse()
2325

24-
fmt.Printf("Event: %s\n Node: %d\n Success: %s\n Details: %s\n",
25-
*event, *nodeID, *success, *details)
26+
eventDetails := fmt.Sprintf("%s - Event: %s\n Node: %d\n Success: %s\n Details: %s\n", time.Now().String(), *event, *nodeID, *success, *details)
27+
28+
// TODO - Use an actual logging framework instead of just writing strings to a file.
29+
logFile, err := os.OpenFile("/data/event.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
30+
if err != nil {
31+
fmt.Printf("failed to open event log: %s", err)
32+
}
33+
defer logFile.Close()
34+
35+
logFile.WriteString(eventDetails)
2636

2737
switch *event {
38+
2839
case "repmgrd_failover_promote", "standby_promote":
2940
// TODO - Need to figure out what to do when success == 0.
30-
if err := reconfigurePGBouncer(*nodeID); err != nil {
31-
fmt.Println(err.Error())
32-
return
41+
42+
retry := 0
43+
maxRetries := 5
44+
success := false
45+
46+
for retry < maxRetries {
47+
if err := reconfigurePGBouncer(*nodeID); err != nil {
48+
errMsg := fmt.Sprintf("%s [%s] attempt: %d - failed to reconfigure pgbouncer: %s\n", *event, time.Now().String(), retry, err)
49+
logFile.WriteString(errMsg)
50+
51+
retry++
52+
time.Sleep(1 * time.Second)
53+
continue
54+
}
55+
56+
success = true
57+
break
58+
}
59+
60+
if success {
61+
msg := fmt.Sprintf("%s [%s] Successfully reconfigured pgBouncer to %d\n", *event, time.Now().String(), *nodeID)
62+
logFile.WriteString(msg)
63+
os.Exit(0)
64+
} else {
65+
msg := fmt.Sprintf(" %s [%s] Failed ot reconfigured pgBouncer to %d\n", *event, time.Now().String(), *nodeID)
66+
logFile.WriteString(msg)
67+
os.Exit(1)
3368
}
3469

3570
case "standby_follow":
71+
3672
newMemberID, err := strconv.Atoi(*newPrimary)
3773
if err != nil {
38-
fmt.Printf("failed to parse new member id: %s", err)
74+
errMsg := fmt.Sprintf("failed to parse newMemberID %s: %s\n", *newPrimary, err)
75+
logFile.WriteString(errMsg)
76+
os.Exit(1)
3977
}
4078

41-
if err := reconfigurePGBouncer(newMemberID); err != nil {
42-
fmt.Println(err.Error())
43-
return
79+
retry := 0
80+
maxRetries := 5
81+
success := false
82+
83+
for retry < maxRetries {
84+
if err := reconfigurePGBouncer(*&newMemberID); err != nil {
85+
errMsg := fmt.Sprintf("%s [%s] attempt: %d - failed to reconfigure pgbouncer: %s\n", *event, time.Now().String(), retry, err)
86+
logFile.WriteString(errMsg)
87+
88+
retry++
89+
time.Sleep(1 * time.Second)
90+
continue
91+
}
92+
93+
success = true
94+
break
95+
}
96+
97+
if success {
98+
msg := fmt.Sprintf("%s [%s] Successfully reconfigured pgBouncer to %d\n", *event, time.Now().String(), newMemberID)
99+
logFile.WriteString(msg)
100+
os.Exit(0)
101+
} else {
102+
msg := fmt.Sprintf(" %s [%s] Failed ot reconfigured pgBouncer to %d\n", *event, time.Now().String(), newMemberID)
103+
logFile.WriteString(msg)
104+
os.Exit(1)
44105
}
106+
45107
default:
46108
// noop
47109
}
@@ -63,7 +125,6 @@ func reconfigurePGBouncer(id int) error {
63125
return err
64126
}
65127

66-
fmt.Println("Reconfiguring pgbouncer primary")
67128
if err := node.PGBouncer.ConfigurePrimary(context.TODO(), member.Hostname, true); err != nil {
68129
return fmt.Errorf("failed to reconfigure pgbouncer primary %s", err)
69130
}

cmd/start/main.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,20 @@ func main() {
2222
node, err := flypg.NewNode()
2323
if err != nil {
2424
panicHandler(err)
25+
return
2526
}
2627

2728
ctx := context.Background()
2829

2930
if err = node.Init(ctx); err != nil {
3031
panicHandler(err)
32+
return
3133
}
3234

3335
go func() {
3436
t := time.NewTicker(1 * time.Second)
3537
defer t.Stop()
3638
for range t.C {
37-
3839
if err := node.PostInit(ctx); err != nil {
3940
fmt.Printf("failed post-init: %s. Retrying...\n", err)
4041
continue
@@ -44,7 +45,8 @@ func main() {
4445
}()
4546

4647
svisor := supervisor.New("flypg", 5*time.Minute)
47-
svisor.AddProcess("flypg", fmt.Sprintf("gosu postgres postgres -D %s -p %d", node.DataDir, node.Port))
48+
49+
svisor.AddProcess("postgres", fmt.Sprintf("gosu postgres postgres -D %s -p %d", node.DataDir, node.Port))
4850

4951
svisor.AddProcess("pgbouncer", fmt.Sprintf("pgbouncer %s/pgbouncer.ini", node.PGBouncer.ConfigPath),
5052
supervisor.WithRestart(0, 1*time.Second),

pkg/flycheck/role.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package flycheck
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/fly-apps/postgres-flex/pkg/flypg"
78
"github.com/pkg/errors"
@@ -26,14 +27,23 @@ func PostgreSQLRole(ctx context.Context, checks *check.CheckSuite) (*check.Check
2627
}
2728

2829
checks.AddCheck("role", func() (string, error) {
30+
if flypg.ZombieLockExists() {
31+
return "zombie", fmt.Errorf("member is in a zombie state. see logs for more details")
32+
}
33+
2934
member, err := node.RepMgr.Member(ctx, conn)
3035
if err != nil {
3136
return "failed", errors.Wrap(err, "failed to check role")
3237
}
3338

3439
switch member.Role {
3540
case flypg.PrimaryRoleName:
36-
return "primary", nil
41+
if member.Active {
42+
return "primary", nil
43+
} else {
44+
return "zombie", nil
45+
}
46+
3747
case flypg.StandbyRoleName:
3848
return "replica", nil
3949
default:

pkg/flypg/admin/admin.go

+42
Original file line numberDiff line numberDiff line change
@@ -330,3 +330,45 @@ func GetSetting(ctx context.Context, pg *pgx.Conn, setting string) (*PGSetting,
330330
}
331331
return &out, nil
332332
}
333+
334+
func SetReadOnly(ctx context.Context, conn *pgx.Conn) error {
335+
databases, err := ListDatabases(ctx, conn)
336+
if err != nil {
337+
return err
338+
}
339+
340+
for _, db := range databases {
341+
if db.Name == "repmgr" || db.Name == "postgres" {
342+
continue
343+
}
344+
345+
sql := fmt.Sprintf("ALTER DATABASE %s set default_transaction_read_only = true;", db.Name)
346+
_, err := conn.Exec(ctx, sql)
347+
if err != nil {
348+
return err
349+
}
350+
}
351+
352+
return nil
353+
}
354+
355+
func UnsetReadOnly(ctx context.Context, conn *pgx.Conn) error {
356+
databases, err := ListDatabases(ctx, conn)
357+
if err != nil {
358+
return err
359+
}
360+
361+
for _, db := range databases {
362+
if db.Name == "repmgr" || db.Name == "postgres" {
363+
continue
364+
}
365+
366+
sql := fmt.Sprintf("ALTER DATABASE %s set default_transaction_read_only = false;", db.Name)
367+
_, err := conn.Exec(ctx, sql)
368+
if err != nil {
369+
return err
370+
}
371+
}
372+
373+
return nil
374+
}

0 commit comments

Comments
 (0)