From cffad9c255bcbc1b9a434361c270259021df29ff Mon Sep 17 00:00:00 2001 From: James Yin Date: Mon, 16 May 2022 15:53:06 +0800 Subject: [PATCH] feat(SegmentServer): compaction and recovery (#32) * fix: truncate then append * fix: visit log stream from compacted * fix: check id when create block * feat: record previous term when switch term * feat(SegmentServer): recovery * feat(SegmentServer): compaction * feat: batched write entries to file appendWithOffset() is idempotent. * fix: reconnect when peer is EOF * feat: raft host support loopback * refactor: SegmentServer recovery * refactor: rename checkoutState() to checkState() * refactor: startHeartbeatTask * fix: use timeout to avoid infinite block * feat: use buffer when send by peer --- internal/raft/log/compaction.go | 211 +++++++++++ internal/raft/log/log.go | 251 ++++++------- internal/raft/log/recovery.go | 169 +++++++-- internal/raft/log/wal.go | 71 ++++ internal/raft/transport/host.go | 14 +- internal/raft/transport/loopback.go | 40 +++ internal/raft/transport/peer.go | 51 ++- internal/store/segment/block/block.go | 18 +- internal/store/segment/block/file.go | 114 +++++- internal/store/segment/block/recovery.go | 74 ++++ internal/store/segment/block/replica.go | 137 ++++--- internal/store/segment/block/replica_test.go | 4 +- internal/store/segment/recovery.go | 113 ++++++ internal/store/segment/segment.go | 355 ++++++++----------- internal/store/wal/compaction.go | 19 + internal/store/wal/stream.go | 9 +- 16 files changed, 1222 insertions(+), 428 deletions(-) create mode 100644 internal/raft/log/compaction.go create mode 100644 internal/raft/log/wal.go create mode 100644 internal/raft/transport/loopback.go create mode 100644 internal/store/segment/block/recovery.go create mode 100644 internal/store/segment/recovery.go create mode 100644 internal/store/wal/compaction.go diff --git a/internal/raft/log/compaction.go b/internal/raft/log/compaction.go new file mode 100644 index 000000000..25b9844c5 --- /dev/null +++ b/internal/raft/log/compaction.go @@ -0,0 +1,211 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + // standard libraries. + "context" + "encoding/binary" + "fmt" + "time" + + // first-party libraries. + "github.com/linkall-labs/raft" + "github.com/linkall-labs/raft/raftpb" + + // this project. + "github.com/linkall-labs/vanus/internal/primitive/vanus" + "github.com/linkall-labs/vanus/internal/store/meta" + "github.com/linkall-labs/vanus/observability/log" +) + +// Compact discards all log entries prior to compactIndex. +// It is the application's responsibility to not attempt to compact an index +// greater than raftLog.applied. +func (l *Log) Compact(i uint64) error { + l.Lock() + defer l.Unlock() + + ci := l.compactedIndex() + if i <= ci { + log.Warning(context.Background(), "raft log has been compacted", map[string]interface{}{}) + return raft.ErrCompacted + } + if i > l.lastIndex() { + log.Error(context.Background(), "conpactedIndex is out of bound lastIndex", map[string]interface{}{ + "compactedIndex": i, + "lastIndex": l.lastIndex(), + }) + // FIXME(james.yin): error + return raft.ErrCompacted + } + + sz := i - ci + remaining := l.length() - sz + ents := make([]raftpb.Entry, 1, 1+remaining) + offs := make([]int64, 1, 1+remaining) + + // Save compact information to dummy entry. + ents[0].Index = l.ents[sz].Index + ents[0].Term = l.ents[sz].Term + last := l.offs[0] + + // Copy remained entries. + if sz < l.length() { + ents = append(ents, l.ents[sz+1:]...) + offs = append(offs, l.offs[sz+1:]...) + } + + // Reset log entries. + l.ents = ents + l.offs = offs + + // Compact WAL. + var compact int64 + if remaining != 0 { + compact = offs[1] + } + offs[0] = compact + l.wal.tryCompact(compact, last, l.nodeID, l.ents[0].Index, l.ents[0].Term) + + return nil +} + +func (w *WAL) suppressCompact(cb exeCallback) error { + result := make(chan error, 1) + w.exec <- exeTask{ + cb: cb, + result: result, + } + return <-result +} + +func (w *WAL) tryCompact(offset, last int64, nodeID vanus.ID, index, term uint64) { + w.exec <- exeTask{ + cb: func() (compactTask, error) { + return compactTask{ + offset: offset, + last: last, + nodeID: nodeID, + info: compactInfo{ + index: index, + term: term, + }, + }, nil + }, + } +} + +type logCompactInfos map[vanus.ID]compactInfo + +var _ meta.Ranger = (logCompactInfos)(nil) + +func (i logCompactInfos) Range(cb meta.RangeCallback) error { + value := make([]byte, 16) + for id := range i { + key := fmt.Sprintf("block/%020d/compact", id.Uint64()) + binary.BigEndian.PutUint64(value[0:8], i[id].index) + binary.BigEndian.PutUint64(value[8:16], i[id].term) + if err := cb([]byte(key), value); err != nil { + return err + } + } + return nil +} + +type compactMeta struct { + infos logCompactInfos + offset int64 +} + +var _ meta.Ranger = (*compactMeta)(nil) + +func (m *compactMeta) Range(cb meta.RangeCallback) error { + if err := m.infos.Range(cb); err != nil { + return err + } + if m.offset != 0 { + if err := cb(walCompactKey, m.offset); err != nil { + return err + } + } + return nil +} + +var emptyMark = struct{}{} + +func (w WAL) run() { + for task := range w.exec { + ct, err := task.cb() + if task.result != nil { + if err != nil { + task.result <- err + continue + } + w.compactc <- ct + close(task.result) + } else if err == nil { + w.compactc <- ct + } + } +} + +func (w *WAL) runCompact() { + peroid := 30 * time.Second + ticker := time.NewTicker(peroid) + defer ticker.Stop() + + var compacted int64 + if v, ok := w.metaStore.Load(walCompactKey); ok { + compacted, _ = v.(int64) + } + + toCompact := compacted + infos := make(logCompactInfos) + for { + select { + case compact := <-w.compactc: + // Discard last barrier. + if compact.last != 0 { + w.barrier.Remove(compact.last) + } + // Set new barrier. + if compact.offset != 0 { + w.barrier.Set(compact.offset, emptyMark) + } + // Set compation info. + if compact.nodeID != 0 { + infos[compact.nodeID] = compact.info + } + if front := w.barrier.Front(); front != nil { + offset, _ := front.Key().(int64) + toCompact = offset + } + // TODO(james.yin): no log entry in WAL. + case <-ticker.C: + if toCompact > compacted || len(infos) != 0 { + log.Debug(context.TODO(), "compact WAL of raft log.", map[string]interface{}{ + "offset": toCompact, + }) + w.metaStore.BatchStore(&compactMeta{ + infos: infos, + offset: toCompact, + }) + compacted = toCompact + infos = make(logCompactInfos) + } + } + } +} diff --git a/internal/raft/log/log.go b/internal/raft/log/log.go index 9f35667e3..fbd2d1ea2 100644 --- a/internal/raft/log/log.go +++ b/internal/raft/log/log.go @@ -17,70 +17,133 @@ package log import ( // standard libraries. "context" + "fmt" "sync" - // third-party libraries. + // first-party libraries. "github.com/linkall-labs/raft" "github.com/linkall-labs/raft/raftpb" // this project. "github.com/linkall-labs/vanus/internal/primitive/vanus" - walog "github.com/linkall-labs/vanus/internal/store/wal" + "github.com/linkall-labs/vanus/internal/store/meta" "github.com/linkall-labs/vanus/observability/log" ) type Log struct { // Protects access to all fields. Most methods of Log are - // run on the raft goroutine, but Append() is run on an application - // goroutine. + // run on the raft goroutine, but Append() and Compact() is run on an + // application goroutine. sync.RWMutex nodeID vanus.ID - hardState raftpb.HardState - confState raftpb.ConfState - // ents[0] is a dummy entry, which record compact information. // ents[i] has raft log position i+snapshot.Metadata.Index. ents []raftpb.Entry // offs[i] is the offset of ents[i] in WAL. offs []int64 - wal *walog.WAL + wal *WAL + + metaStore *meta.SyncStore + offsetStore *meta.AsyncStore + + prevHardSt raftpb.HardState + + hsKey []byte + offKey []byte + csKey []byte + appKey []byte } // Make sure Log implements raft.Storage. var _ raft.Storage = (*Log)(nil) // NewLog creates an empty Log. -func NewLog(nodeID vanus.ID, wal *walog.WAL, peers []uint64) *Log { +func NewLog(nodeID vanus.ID, wal *WAL, metaStore *meta.SyncStore, offsetStore *meta.AsyncStore) *Log { return &Log{ nodeID: nodeID, - confState: raftpb.ConfState{ - Voters: peers, - }, // When starting from scratch populate the list with a dummy entry at term zero. - ents: make([]raftpb.Entry, 1), - offs: make([]int64, 1), - wal: wal, + ents: make([]raftpb.Entry, 1), + offs: make([]int64, 1), + wal: wal, + metaStore: metaStore, + offsetStore: offsetStore, + hsKey: []byte(fmt.Sprintf("block/%020d/hardState", nodeID.Uint64())), + offKey: []byte(fmt.Sprintf("block/%020d/commit", nodeID.Uint64())), + csKey: []byte(fmt.Sprintf("block/%020d/confState", nodeID.Uint64())), + appKey: []byte(fmt.Sprintf("block/%020d/applied", nodeID.Uint64())), } } // InitialState returns the saved HardState and ConfState information. func (l *Log) InitialState() (raftpb.HardState, raftpb.ConfState, error) { - l.RLock() - defer l.RUnlock() - return l.hardState, l.confState, nil + hs, err := l.recoverHardState() + if err != nil { + return raftpb.HardState{}, raftpb.ConfState{}, err + } + if compacted := l.Compacted(); hs.Commit < compacted { + hs.Commit = compacted + } + l.prevHardSt = hs + + cs, err := l.recoverConfState() + if err != nil { + return raftpb.HardState{}, raftpb.ConfState{}, err + } + + return hs, cs, nil +} + +// HardState returns the saved HardState. +// +// NOTE: This method is not thread-safty, it must be used in goroutine which call SetHardState!!! +func (l *Log) HardState() raftpb.HardState { + return l.prevHardSt } // SetHardState saves the current HardState. -func (l *Log) SetHardState(st raftpb.HardState) error { - l.Lock() - defer l.Unlock() - l.hardState = st +func (l *Log) SetHardState(hs raftpb.HardState) (err error) { + if hs.Term != l.prevHardSt.Term || hs.Vote != l.prevHardSt.Vote { + var data []byte + if data, err = hs.Marshal(); err != nil { + return err + } + l.metaStore.Store(l.hsKey, data) + l.prevHardSt = hs + } else { + l.offsetStore.Store(l.offKey, hs.Commit) + l.prevHardSt.Commit = hs.Commit + } + return nil +} + +func (l *Log) SetConfState(cs raftpb.ConfState) error { + data, err := cs.Marshal() + if err != nil { + return err + } + l.metaStore.Store(l.csKey, data) return nil } +func (l *Log) Applied() uint64 { + applied := l.recoverApplied() + if compacted := l.Compacted(); applied < compacted { + return compacted + } + return applied +} + +func (l *Log) SetApplied(app uint64) { + l.offsetStore.Store(l.appKey, app) +} + +func (l *Log) Compacted() uint64 { + return l.ents[0].Index +} + // Entries returns a slice of log entries in the range [lo,hi). // MaxSize limits the total size of the log entries returned, but // Entries returns at least one entry if any. @@ -208,78 +271,6 @@ func (l *Log) ApplySnapshot(snap raftpb.Snapshot) error { return nil } -// CreateSnapshot makes a snapshot which can be retrieved with Snapshot() and -// can be used to reconstruct the state at that point. -// If any configuration changes have been made since the last compaction, -// the result of the last ApplyConfChange must be passed in. -func (l *Log) CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte) (raftpb.Snapshot, error) { - l.Lock() - defer l.Unlock() - - //if i <= l.snapshot.Metadata.Index { - // log.Warning(context.Background(), "snapshot is out of date", map[string]interface{}{}) - // return raftpb.Snapshot{}, raft.ErrSnapOutOfDate - //} - - //if i > l.lastIndex() { - // log.Error(context.Background(), "snampshotIndex is out of bound lastIndex", map[string]interface{}{ - // "snapshotIndex": i, - // "lastIndex": l.lastIndex(), - // }) - // // FIXME(james.yin): error - // return raftpb.Snapshot{}, raft.ErrSnapOutOfDate - //} - - //l.snapshot.Metadata.Index = i - //l.snapshot.Metadata.Term = l.ents[i-l.compactedIndex()].Term - //if cs != nil { - // l.snapshot.Metadata.ConfState = *cs - //} - //l.snapshot.Data = data - //return l.snapshot, nil - - return raftpb.Snapshot{}, raft.ErrSnapshotTemporarilyUnavailable -} - -// Compact discards all log entries prior to compactIndex. -// It is the application's responsibility to not attempt to compact an index -// greater than raftLog.applied. -func (l *Log) Compact(i uint64) error { - l.Lock() - defer l.Unlock() - - ci := l.compactedIndex() - if i <= ci { - log.Warning(context.Background(), "raft log has been compacted", map[string]interface{}{}) - return raft.ErrCompacted - } - if i > l.lastIndex() { - log.Error(context.Background(), "conpactedIndex is out of bound lastIndex", map[string]interface{}{ - "compactedIndex": i, - "lastIndex": l.lastIndex(), - }) - // FIXME(james.yin): error - return raft.ErrCompacted - } - - sz := i - ci - ents := make([]raftpb.Entry, 1, 1+l.length()-sz) - - // save compact information to dummy entry - ents[0].Index = l.ents[sz].Index - ents[0].Term = l.ents[sz].Term - - // copy remained entries - if sz < l.length() { - ents = append(ents, l.ents[sz+1:]...) - } - - // reset log entries - l.ents = ents - - return nil -} - // Append the new entries to storage. func (l *Log) Append(entries []raftpb.Entry) error { if len(entries) == 0 { @@ -293,6 +284,9 @@ func (l *Log) Append(entries []raftpb.Entry) error { // FIXME(james.yin): error return raft.ErrUnavailable } + if entries[i].Term != entries[i-1].Term { + entries[i].PrevTerm = entries[i-1].PrevTerm + } } l.Lock() @@ -324,8 +318,35 @@ func (l *Log) Append(entries []raftpb.Entry) error { firstToAppend = entries[0].Index } + pi := firstToAppend - firstInLog + if entries[0].Term != l.ents[pi].Term { + entries[0].PrevTerm = l.ents[pi].Term + } + // Append to WAL. - offsets, err := l.appendToWAL(entries) + var err error + var offsets []int64 + func() { + l.Unlock() + defer l.Lock() + + if firstToAppend == firstInLog { + if l.wal.suppressCompact(func() (compactTask, error) { + if offsets, err = l.appendToWAL(entries); err != nil { + return compactTask{}, err + } + return compactTask{ + offset: offsets[0], + last: l.offs[0], + }, nil + }) == nil { + // Record offset of first entry in WAL. + l.offs[0] = offsets[0] + } + } else { + offsets, err = l.appendToWAL(entries) + } + }() if err != nil { // FIXME(james.yin): correct error return err @@ -338,7 +359,7 @@ func (l *Log) Append(entries []raftpb.Entry) error { l.offs = append(l.offs, offsets...) } else { // truncate then append: firstToAppend < expectedToAppend - si := firstToAppend - firstInLog + si := pi + 1 l.ents = append([]raftpb.Entry{}, l.ents[:si]...) l.ents = append(l.ents, entries...) l.offs = append([]int64{}, l.offs[:si]...) @@ -361,41 +382,3 @@ func (l *Log) appendToWAL(entries []raftpb.Entry) ([]int64, error) { } return l.wal.Append(ents) } - -func (l *Log) appendInRecovery(entry raftpb.Entry, offset int64) error { - firstInLog := l.firstIndex() - expectedToAppend := l.lastIndex() + 1 - index := entry.Index - - if expectedToAppend < index { - log.Error(context.Background(), "missing log entries", map[string]interface{}{ - "lastIndex": expectedToAppend - 1, - "appendedIndex": index, - }) - // FIXME(james.yin): correct error - return raft.ErrUnavailable - } - - // write to cache - if index == expectedToAppend { - // append - l.ents = append(l.ents, entry) - l.offs = append(l.offs, offset) - } else if index < firstInLog { - // reset - l.ents = []raftpb.Entry{{ - Index: entry.Index - 1, - // TODO(james.yin): set Term - }, entry} - l.offs = []int64{0} - } else { - // truncate then append - si := index - firstInLog - l.ents = append([]raftpb.Entry{}, l.ents[:si]...) - l.ents = append(l.ents, entry) - l.offs = append([]int64{}, l.offs[:si]...) - l.offs = append(l.offs, offset) - } - - return nil -} diff --git a/internal/raft/log/recovery.go b/internal/raft/log/recovery.go index 81b7e9e6c..b2ec7e2c9 100644 --- a/internal/raft/log/recovery.go +++ b/internal/raft/log/recovery.go @@ -15,46 +15,175 @@ package log import ( - // first-party libraries + // standard libraries. + "context" + "encoding/binary" + "fmt" + + // first-party libraries. + "github.com/linkall-labs/raft" "github.com/linkall-labs/raft/raftpb" - // this project + // this project. "github.com/linkall-labs/vanus/internal/primitive/vanus" + "github.com/linkall-labs/vanus/internal/store/meta" walog "github.com/linkall-labs/vanus/internal/store/wal" + "github.com/linkall-labs/vanus/observability/log" ) -func RecoverLogsAndWAL(walDir string) (map[vanus.ID]*Log, *walog.WAL, error) { - raftLogs := make(map[uint64]*Log) - // TODO(james.yin): visit from compacted offset - wal, err := walog.RecoverWithVisitor(walDir, 0, func(data []byte, offset int64) error { +func RecoverLogsAndWAL(walDir string, metaStore *meta.SyncStore, offsetStore *meta.AsyncStore) (map[vanus.ID]*Log, *WAL, error) { + var compacted int64 + if v, exist := metaStore.Load(walCompactKey); exist { + var ok bool + if compacted, ok = v.(int64); !ok { + panic("raftLog: compacted is not int64") + } + } + + logs := make(map[uint64]*Log) + wal, err := walog.RecoverWithVisitor(walDir, compacted, func(data []byte, offset int64) error { var entry raftpb.Entry err := entry.Unmarshal(data) if err != nil { return err } - raftLog := raftLogs[entry.NodeId] - if raftLog == nil { - // TODO(james.yin): peers - raftLog = NewLog(vanus.NewIDFromUint64(entry.NodeId), nil, nil) - dummy := &raftLog.ents[0] - dummy.Index = entry.Index - 1 - // TODO(james.yin): set Term - raftLogs[entry.NodeId] = raftLog + l := logs[entry.NodeId] + if l == nil { + l = RecoverLog(vanus.NewIDFromUint64(entry.NodeId), nil, metaStore, offsetStore) + logs[entry.NodeId] = l } - return raftLog.appendInRecovery(entry, offset) + return l.appendInRecovery(entry, offset) }) if err != nil { return nil, nil, err } + wal2 := newWAL(wal, metaStore) // convert raftLogs, and set wal - raftLogs2 := make(map[vanus.ID]*Log, len(raftLogs)) - for nodeID, raftLog := range raftLogs { - raftLog.wal = wal - raftLogs2[vanus.NewIDFromUint64(nodeID)] = raftLog + logs2 := make(map[vanus.ID]*Log, len(logs)) + for nodeID, raftLog := range logs { + raftLog.wal = wal2 + // TODO(james.yin): move to compaction.go + if offset := raftLog.offs[0]; offset != 0 { + wal2.barrier.Set(offset, emptyMark) + } + logs2[vanus.NewIDFromUint64(nodeID)] = raftLog + } + + return logs2, wal2, nil +} + +func RecoverLog(blockID vanus.ID, wal *WAL, metaStore *meta.SyncStore, offsetStore *meta.AsyncStore) *Log { + l := NewLog(blockID, wal, metaStore, offsetStore) + l.recoverCompactionInfo() + return l +} + +func (l *Log) recoverHardState() (raftpb.HardState, error) { + var hs raftpb.HardState + if v, exist := l.metaStore.Load(l.hsKey); exist { + b, ok := v.([]byte) + if !ok { + panic("raftLog: hardState is not []byte") + } + if err := hs.Unmarshal(b); err != nil { + return raftpb.HardState{}, err + } + } + if v, exist := l.offsetStore.Load(l.offKey); exist { + c, ok := v.(uint64) + if !ok { + panic("raftLog: commit is not uint64") + } + if c > hs.Commit { + hs.Commit = c + } + } + return hs, nil +} + +func (l *Log) recoverConfState() (raftpb.ConfState, error) { + var cs raftpb.ConfState + if v, exist := l.metaStore.Load(l.csKey); exist { + b, ok := v.([]byte) + if !ok { + panic("raftLog: confState is not []byte") + } + if err := cs.Unmarshal(b); err != nil { + return raftpb.ConfState{}, err + } + } + return cs, nil +} + +func (l *Log) recoverApplied() uint64 { + if v, exist := l.offsetStore.Load(l.appKey); exist { + app, ok := v.(uint64) + if !ok { + panic("raftLog: applied is not uint64") + } + return app + } + return 0 +} + +func (l *Log) recoverCompactionInfo() { + key := fmt.Sprintf("block/%020d/compact", l.nodeID.Uint64()) + if v, ok := l.metaStore.Load([]byte(key)); ok { + if buf, ok2 := v.([]byte); ok2 { + dummy := &l.ents[0] + dummy.Index = binary.BigEndian.Uint64(buf[0:8]) + dummy.Term = binary.BigEndian.Uint64(buf[8:16]) + } else { + panic("raftLog: compact is not []byte") + } + } +} + +func (l *Log) appendInRecovery(entry raftpb.Entry, offset int64) error { + firstInLog := l.firstIndex() + expectedToAppend := l.lastIndex() + 1 + index := entry.Index + + if expectedToAppend < index { + log.Error(context.Background(), "missing log entries", map[string]interface{}{ + "lastIndex": expectedToAppend - 1, + "appendedIndex": index, + }) + // FIXME(james.yin): correct error + return raft.ErrUnavailable + } + + // Write to cache. + switch { + case index < firstInLog: + // Compacted entry, discard. + case index == firstInLog: + // First entry, reset compaction info. + if offset > l.offs[0] { + l.ents = []raftpb.Entry{{ + Index: index - 1, + Term: entry.Term, + }, entry} + if entry.PrevTerm != 0 { + l.ents[0].Term = entry.PrevTerm + } + l.offs = []int64{offset, offset} + } + case index == expectedToAppend: + // Append entry. + l.ents = append(l.ents, entry) + l.offs = append(l.offs, offset) + default: + // Truncate then append entry. + si := index - firstInLog + 1 + l.ents = append([]raftpb.Entry{}, l.ents[:si]...) + l.ents = append(l.ents, entry) + l.offs = append([]int64{}, l.offs[:si]...) + l.offs = append(l.offs, offset) } - return raftLogs2, wal, nil + return nil } diff --git a/internal/raft/log/wal.go b/internal/raft/log/wal.go new file mode 100644 index 000000000..24fe6fcde --- /dev/null +++ b/internal/raft/log/wal.go @@ -0,0 +1,71 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package log + +import ( + // third-party libraries. + "github.com/huandu/skiplist" + + // this project. + "github.com/linkall-labs/vanus/internal/primitive/vanus" + "github.com/linkall-labs/vanus/internal/store/meta" + walog "github.com/linkall-labs/vanus/internal/store/wal" +) + +var ( + walCompactKey = []byte("wal/compact") +) + +type compactInfo struct { + index, term uint64 +} + +type compactTask struct { + offset, last int64 + nodeID vanus.ID + info compactInfo +} + +type exeCallback func() (compactTask, error) + +type exeTask struct { + cb exeCallback + result chan error +} + +type WAL struct { + *walog.WAL + + metaStore *meta.SyncStore + + barrier *skiplist.SkipList + exec chan exeTask + compactc chan compactTask +} + +func newWAL(wal *walog.WAL, metaStore *meta.SyncStore) *WAL { + w := &WAL{ + WAL: wal, + metaStore: metaStore, + barrier: skiplist.New(skiplist.Int64), + exec: make(chan exeTask, 256), + compactc: make(chan compactTask, 256), + } + + go w.run() + go w.runCompact() + + return w +} diff --git a/internal/raft/transport/host.go b/internal/raft/transport/host.go index 1414c0596..f48231298 100644 --- a/internal/raft/transport/host.go +++ b/internal/raft/transport/host.go @@ -15,11 +15,11 @@ package transport import ( - // standard libraries + // standard libraries. "context" "sync" - // third-party libraries + // third-party libraries. "github.com/linkall-labs/raft/raftpb" ) @@ -35,6 +35,7 @@ type host struct { receivers sync.Map resolver Resolver callback string + lo Multiplexer } // Make sure host implements Host. @@ -45,6 +46,10 @@ func NewHost(resolver Resolver, callback string) Host { resolver: resolver, callback: callback, } + h.lo = &loopback{ + addr: callback, + dmu: h, + } return h } @@ -70,6 +75,11 @@ func (h *host) resolveMultiplexer(ctx context.Context, to uint64, endpoint strin return nil } } + + if endpoint == h.callback { + return h.lo + } + if mux, ok := h.peers.Load(endpoint); ok { return mux.(*peer) } diff --git a/internal/raft/transport/loopback.go b/internal/raft/transport/loopback.go new file mode 100644 index 000000000..efddb1821 --- /dev/null +++ b/internal/raft/transport/loopback.go @@ -0,0 +1,40 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transport + +import ( + // standard libraries. + "context" + + // first-party libraries. + "github.com/linkall-labs/raft/raftpb" +) + +type loopback struct { + addr string + dmu Demultiplexer +} + +var _ Multiplexer = (*loopback)(nil) + +func (lo *loopback) Send(msg *raftpb.Message) { + _ = lo.dmu.Receive(context.TODO(), msg, lo.addr) +} + +func (lo *loopback) Sendv(msgs []*raftpb.Message) { + for _, msg := range msgs { + _ = lo.dmu.Receive(context.TODO(), msg, lo.addr) + } +} diff --git a/internal/raft/transport/peer.go b/internal/raft/transport/peer.go index cadd2774a..bddfdf63b 100644 --- a/internal/raft/transport/peer.go +++ b/internal/raft/transport/peer.go @@ -17,6 +17,9 @@ package transport import ( // standard libraries. "context" + "errors" + "io" + "time" // third-party libraries. "google.golang.org/grpc" @@ -27,9 +30,15 @@ import ( vsraftpb "github.com/linkall-labs/vsproto/pkg/raft" ) +const ( + defaultTimeoutMilliseconds = 300 + defaultMessageChainSize = 32 +) + type peer struct { addr string msgc chan *raftpb.Message + stream vsraftpb.RaftServer_SendMsssageClient ctx context.Context cancel context.CancelFunc } @@ -42,7 +51,7 @@ func newPeer(ctx context.Context, endpoint string, callback string) *peer { p := &peer{ addr: endpoint, - msgc: make(chan *raftpb.Message), + msgc: make(chan *raftpb.Message, defaultMessageChainSize), ctx: ctx, cancel: cancel, } @@ -53,30 +62,34 @@ func newPeer(ctx context.Context, endpoint string, callback string) *peer { } func (p *peer) run(callback string) { - opts := []grpc.DialOption{grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())} + opts := []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + } preface := raftpb.Message{ Context: []byte(callback), } - var stream vsraftpb.RaftServer_SendMsssageClient loop: for { var err error select { case msg := <-p.msgc: - // TODO(james.yin): reconnect + stream := p.stream if stream == nil { if stream, err = p.connect(opts...); err != nil { + p.processSendError(err) break } + p.stream = stream if err = stream.Send(&preface); err != nil { - // TODO(james.yin): handle err + p.processSendError(err) + break } } - err = stream.Send(msg) - if err != nil { - // TODO(james.yin): handle error + if err = stream.Send(msg); err != nil { + p.processSendError(err) break } case <-p.ctx.Done(): @@ -84,8 +97,16 @@ loop: } } - if stream != nil { - stream.CloseAndRecv() + if p.stream != nil { + _, _ = p.stream.CloseAndRecv() + } +} + +func (p *peer) processSendError(err error) { + // TODO(james.yin): report MsgUnreachable, backoff + if errors.Is(err, io.EOF) { + _, _ = p.stream.CloseAndRecv() + p.stream = nil } } @@ -96,8 +117,9 @@ func (p *peer) Close() { func (p *peer) Send(msg *raftpb.Message) { // TODO(james.yin): select { - case p.msgc <- msg: case <-p.ctx.Done(): + return + case p.msgc <- msg: } } @@ -108,10 +130,15 @@ func (p *peer) Sendv(msgs []*raftpb.Message) { } func (p *peer) connect(opts ...grpc.DialOption) (vsraftpb.RaftServer_SendMsssageClient, error) { - conn, err := grpc.DialContext(context.TODO(), p.addr, opts...) + timeout := defaultTimeoutMilliseconds * time.Millisecond + ctx, cancel := context.WithTimeout(p.ctx, timeout) + defer cancel() + + conn, err := grpc.DialContext(ctx, p.addr, opts...) if err != nil { return nil, err } + client := vsraftpb.NewRaftServerClient(conn) stream, err := client.SendMsssage(context.TODO()) if err != nil { diff --git a/internal/store/segment/block/block.go b/internal/store/segment/block/block.go index a72b8df04..9f81448b4 100644 --- a/internal/store/segment/block/block.go +++ b/internal/store/segment/block/block.go @@ -15,19 +15,20 @@ package block import ( - // standard libraries + // standard libraries. "context" "errors" + "io" "os" "path/filepath" - // third-party libraries + // third-party libraries. "go.uber.org/atomic" - // first-party libraries + // first-party libraries. "github.com/linkall-labs/vsproto/pkg/meta" - // this project + // this project. "github.com/linkall-labs/vanus/internal/primitive/vanus" "github.com/linkall-labs/vanus/observability" ) @@ -65,10 +66,11 @@ type SegmentBlock interface { HealthInfo() *meta.SegmentHealthInfo } -func CreateFileSegmentBlock(ctx context.Context, id vanus.ID, path string, capacity int64) (SegmentBlock, error) { +func CreateFileSegmentBlock(ctx context.Context, blockDir string, id vanus.ID, capacity int64) (SegmentBlock, error) { observability.EntryMark(ctx) defer observability.LeaveMark(ctx) + path := resolvePath(blockDir, id) b := &fileBlock{ id: id, path: path, @@ -83,7 +85,7 @@ func CreateFileSegmentBlock(ctx context.Context, id vanus.ID, path string, capac if err = f.Truncate(capacity); err != nil { return nil, err } - if _, err = f.Seek(fileBlockHeaderSize, 0); err != nil { + if _, err = f.Seek(fileBlockHeaderSize, io.SeekStart); err != nil { return nil, err } b.appendable.Store(true) @@ -100,7 +102,9 @@ func CreateFileSegmentBlock(ctx context.Context, id vanus.ID, path string, capac func OpenFileSegmentBlock(ctx context.Context, path string) (SegmentBlock, error) { observability.EntryMark(ctx) defer observability.LeaveMark(ctx) - id, err := vanus.NewIDFromString(filepath.Base(path)) + + filename := filepath.Base(path) + id, err := vanus.NewIDFromString(filename[:len(filename)-len(blockExt)]) if err != nil { return nil, err } diff --git a/internal/store/segment/block/file.go b/internal/store/segment/block/file.go index 8256e577d..b33dada9b 100644 --- a/internal/store/segment/block/file.go +++ b/internal/store/segment/block/file.go @@ -15,25 +15,30 @@ package block import ( - // standard libraries + // standard libraries. "context" "encoding/binary" + "fmt" "os" + "path/filepath" "sync" "time" - // third-party libraries + // third-party libraries. "go.uber.org/atomic" - // first-party libraries + // first-party libraries. "github.com/linkall-labs/vsproto/pkg/meta" - // this project + // this project. "github.com/linkall-labs/vanus/internal/primitive/vanus" + "github.com/linkall-labs/vanus/internal/store/segment/errors" "github.com/linkall-labs/vanus/observability" + "github.com/linkall-labs/vanus/observability/log" ) const ( + blockExt = ".block" fileBlockHeaderSize = 4 * 1024 // version + capacity + size + number + full @@ -41,6 +46,10 @@ const ( entryLengthSize = 4 ) +func resolvePath(blockDir string, id vanus.ID) string { + return filepath.Join(blockDir, fmt.Sprintf("%020d%s", id.Uint64(), blockExt)) +} + type fileBlock struct { version int32 id vanus.ID @@ -146,7 +155,102 @@ func (b *fileBlock) Append(ctx context.Context, entities ...Entry) error { return nil } -// Read date from file +func (b *fileBlock) appendWithOffset(ctx context.Context, entries ...Entry) error { + if len(entries) == 0 { + return nil + } + + num := uint32(b.num.Load()) + for i := 0; i < len(entries); i++ { + switch entry := &entries[i]; { + case entry.Index < num: + log.Warning(ctx, "block: entry index less than block num, skip this entry.", map[string]interface{}{ + "index": entry.Index, + "num": num, + }) + continue + case entry.Index > num: + log.Error(ctx, "block: entry index greater than block num.", map[string]interface{}{ + "index": entry.Index, + "num": num, + }) + return errors.ErrInternal + } + if i != 0 { + entries = entries[i:] + } + break + } + if len(entries) == 0 { + return nil + } + + wo := uint32(b.wo.Load()) + offset := entries[0].Offset + if offset != wo { + log.Error(ctx, "block: entry offset is not equal than block wo.", map[string]interface{}{ + "offset": offset, + "wo": wo, + }) + return errors.ErrInternal + } + + for i := 1; i < len(entries); i++ { + entry := &entries[i] + prev := &entries[i-1] + if prev.Index+1 != entry.Index { + log.Error(ctx, "block: entry index is discontinuous.", map[string]interface{}{ + "index": entry.Index, + "prev": prev.Index, + }) + return errors.ErrInternal + } + if prev.Offset+uint32(prev.Size()) != entry.Offset { + log.Error(ctx, "block: entry offset is discontinuous.", map[string]interface{}{ + "offset": entry.Offset, + "prev": prev.Offset, + }) + return errors.ErrInternal + } + } + + last := &entries[len(entries)-1] + length := int(last.Offset-offset) + last.Size() + + // Check free space. + if length+v1IndexLength*len(entries) > b.remaining() { + b.full.Store(true) + return ErrNoEnoughCapacity + } + + buf := make([]byte, length) + indexs := make([]index, 0, len(entries)) + for _, entry := range entries { + n, _ := entry.MarshalTo(buf[entry.Offset-offset:]) + indexs = append(indexs, index{ + offset: int64(entry.Offset), + length: int32(n), + }) + } + + n, err := b.f.WriteAt(buf, int64(offset)) + if err != nil { + return err + } + + b.indexes = append(b.indexes, indexs...) + + b.num.Add(int32(len(entries))) + b.wo.Add(int64(n)) + b.size.Add(int64(n)) + + //if err = b.physicalFile.Sync(); err != nil { + // return err + //} + + return nil +} + func (b *fileBlock) Read(ctx context.Context, entityStartOffset, number int) ([]Entry, error) { observability.EntryMark(ctx) b.uncompletedReadRequestCount.Add(1) diff --git a/internal/store/segment/block/recovery.go b/internal/store/segment/block/recovery.go new file mode 100644 index 000000000..118da8369 --- /dev/null +++ b/internal/store/segment/block/recovery.go @@ -0,0 +1,74 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package block + +import ( + // standard libraries. + "os" + "path/filepath" + + // this project. + "github.com/linkall-labs/vanus/internal/primitive/vanus" +) + +const ( + defaultDirPerm = 0755 +) + +func RecoverBlocks(blockDir string) (map[vanus.ID]string, error) { + // Make sure the block directory exists. + if err := os.MkdirAll(blockDir, defaultDirPerm); err != nil { + return nil, err + } + + files, err := os.ReadDir(blockDir) + if err != nil { + return nil, err + } + files = filterRegularBlock(files) + + blocks := make(map[vanus.ID]string, len(files)) + for _, file := range files { + filename := file.Name() + blockID, err2 := vanus.NewIDFromString(filename[:len(filename)-len(blockExt)]) + if err2 != nil { + return nil, err2 + } + + path := filepath.Join(blockDir, filename) + blocks[blockID] = path + } + return blocks, nil +} + +func filterRegularBlock(entries []os.DirEntry) []os.DirEntry { + if len(entries) == 0 { + return entries + } + + n := 0 + for _, entry := range entries { + if !entry.Type().IsRegular() { + continue + } + if filepath.Ext(entry.Name()) != blockExt { + continue + } + entries[n] = entry + n++ + } + entries = entries[:n] + return entries +} diff --git a/internal/store/segment/block/replica.go b/internal/store/segment/block/replica.go index d4fe5d11e..67561b2b5 100644 --- a/internal/store/segment/block/replica.go +++ b/internal/store/segment/block/replica.go @@ -72,10 +72,6 @@ var _ transport.Receiver = (*Replica)(nil) func NewReplica(ctx context.Context, block SegmentBlock, raftLog *raftlog.Log, sender transport.Sender) *Replica { blockID := block.SegmentBlockID() - // TODO(james.yin): Recover the in-memory storage from persistent snapshot, state and entries. - // log.ApplySnapshot(snapshot) - // log.SetHardState(state) - ctx, cancel := context.WithCancel(ctx) r := &Replica{ @@ -94,8 +90,11 @@ func NewReplica(ctx context.Context, block SegmentBlock, raftLog *raftlog.Log, s ElectionTick: 10, HeartbeatTick: 3, Storage: raftLog, + Applied: raftLog.Applied(), + Compacted: raftLog.Compacted(), MaxSizePerMsg: 4096, MaxInflightMsgs: 256, + PreVote: true, DisableProposalForwarding: true, } r.node = raft.RestartNode(c) @@ -130,15 +129,15 @@ func (r *Replica) run() { t := time.NewTicker(100 * time.Millisecond) defer t.Stop() - var hardState raftpb.HardState for { select { case <-t.C: r.node.Tick() case rd := <-r.node.Ready(): - // TODO(james.yin): hard state if !raft.IsEmptyHardState(rd.HardState) { - hardState = rd.HardState + if err := r.log.SetHardState(rd.HardState); err != nil { + panic(err) + } } if err := r.log.Append(rd.Entries); err != nil { @@ -150,7 +149,7 @@ func (r *Replica) run() { isLeader := rd.SoftState.RaftState == raft.StateLeader // reset when become leader if isLeader { - r.reset(hardState) + r.reset() } r.isLeader = isLeader } @@ -159,42 +158,54 @@ func (r *Replica) run() { // are committed to stable storage. r.send(rd.Messages) + var applied uint64 + // TODO(james.yin): snapshot if !raft.IsEmptySnap(rd.Snapshot) { // processSnapshot(rd.Snapshot) + // applied = rd.Snapshot.Metadata.Index } - for _, entrypb := range rd.CommittedEntries { - if entrypb.Type == raftpb.EntryNormal { - // Skip empty entry(raft heartbeat). - if len(entrypb.Data) > 0 { - r.doAppend(entrypb) + if num := len(rd.CommittedEntries); num != 0 { + var cs *raftpb.ConfState + + entries := make([]Entry, 0, num) + for i := range rd.CommittedEntries { + entrypb := &rd.CommittedEntries[i] + + if entrypb.Type == raftpb.EntryNormal { + // Skip empty entry(raft heartbeat). + if len(entrypb.Data) > 0 { + entry := UnmarshalWithOffsetAndIndex(entrypb.Data) + entries = append(entries, entry) + } + continue } - continue + + // Change membership. + cs = r.applyConfChange(entrypb) + } + + if len(entries) > 0 { + r.doAppend(entries...) } - // change membership - var cci raftpb.ConfChangeI - if entrypb.Type == raftpb.EntryConfChange { - var cc raftpb.ConfChange - if err := cc.Unmarshal(entrypb.Data); err != nil { + // ConfState is changed. + if cs != nil { + if err := r.log.SetConfState(*cs); err != nil { panic(err) } - // TODO(james.yin): non-add - r.hintEndpoint(cc.NodeID, string(cc.Context)) - cci = cc - } else { - var cc raftpb.ConfChangeV2 - if err := cc.Unmarshal(entrypb.Data); err != nil { - panic(nil) - } - // TODO(james.yin): non-add - for _, ccs := range cc.Changes { - r.hintEndpoint(ccs.NodeID, string(cc.Context)) - } - cci = cc } - r.node.ApplyConfChange(cci) + + applied = rd.CommittedEntries[len(rd.CommittedEntries)-1].Index + } + + if applied != 0 { + r.log.SetApplied(applied) + } + + if rd.Compact != 0 { + _ = r.log.Compact(rd.Compact) } r.node.Advance() @@ -206,10 +217,39 @@ func (r *Replica) run() { } } -func (r *Replica) reset(hardState raftpb.HardState) { +func (r *Replica) applyConfChange(entrypb *raftpb.Entry) *raftpb.ConfState { + if entrypb.Type == raftpb.EntryNormal { + // TODO(james.yin): return error + return nil + } + + var cci raftpb.ConfChangeI + if entrypb.Type == raftpb.EntryConfChange { + var cc raftpb.ConfChange + if err := cc.Unmarshal(entrypb.Data); err != nil { + panic(err) + } + // TODO(james.yin): non-add + r.hintEndpoint(cc.NodeID, string(cc.Context)) + cci = cc + } else { + var cc raftpb.ConfChangeV2 + if err := cc.Unmarshal(entrypb.Data); err != nil { + panic(err) + } + // TODO(james.yin): non-add + for _, ccs := range cc.Changes { + r.hintEndpoint(ccs.NodeID, string(cc.Context)) + } + cci = cc + } + return r.node.ApplyConfChange(cci) +} + +func (r *Replica) reset() { off, err := r.log.LastIndex() if err != nil { - off = hardState.Commit + off = r.log.HardState().Commit } for off > 0 { @@ -228,7 +268,7 @@ func (r *Replica) reset(hardState raftpb.HardState) { break } - off -= 1 + off-- } // no normal entry @@ -317,18 +357,27 @@ func (r *Replica) preAppend(ctx context.Context, entries []Entry) error { return nil } -func (r *Replica) doAppend(entrypb raftpb.Entry) { - entry := UnmarshalWithOffsetAndIndex(entrypb.Data) - - // TODO(james.yin): full - if len(entry.Payload) <= 0 { - r.full = true - r.block.full.Store(true) +func (r *Replica) doAppend(entries ...Entry) { + num := len(entries) + if num == 0 { return } + last := &entries[num-1] + if len(last.Payload) == 0 { + entries = entries[:num-1] + } else { + last = nil + } + // FIXME(james.yin): context - r.block.Append(context.Background(), entry) + r.block.appendWithOffset(context.TODO(), entries...) + + // Mark full. + if last != nil { + r.full = true + r.block.full.Store(true) + } } func (r *Replica) send(msgs []raftpb.Message) { diff --git a/internal/store/segment/block/replica_test.go b/internal/store/segment/block/replica_test.go index c714b441b..6920ebcc1 100644 --- a/internal/store/segment/block/replica_test.go +++ b/internal/store/segment/block/replica_test.go @@ -15,10 +15,10 @@ package block import ( - // standard libraries + // standard libraries. "testing" - // third-party libraries + // third-party libraries. . "github.com/smartystreets/goconvey/convey" ) diff --git a/internal/store/segment/recovery.go b/internal/store/segment/recovery.go new file mode 100644 index 000000000..d4b153df4 --- /dev/null +++ b/internal/store/segment/recovery.go @@ -0,0 +1,113 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package segment + +import ( + // standard libraries. + "context" + "path/filepath" + + // this project. + "github.com/linkall-labs/vanus/internal/primitive/vanus" + raftlog "github.com/linkall-labs/vanus/internal/raft/log" + "github.com/linkall-labs/vanus/internal/store/meta" + "github.com/linkall-labs/vanus/internal/store/segment/block" + "github.com/linkall-labs/vanus/observability/log" +) + +func (s *segmentServer) recover(ctx context.Context) error { + metaStore, err := meta.RecoverSyncStore(filepath.Join(s.volumeDir, "meta")) + if err != nil { + return err + } + s.metaStore = metaStore + + offsetStore, err := meta.RecoverAsyncStore(filepath.Join(s.volumeDir, "offset")) + if err != nil { + return err + } + s.offsetStore = offsetStore + + // Recover wal and raft log. + raftLogs, wal, err := raftlog.RecoverLogsAndWAL(filepath.Join(s.volumeDir, "raft"), metaStore, offsetStore) + if err != nil { + return err + } + s.wal = wal + + if err = s.recoverBlocks(ctx, raftLogs); err != nil { + return err + } + + return nil +} + +func (s *segmentServer) recoverBlocks(ctx context.Context, raftLogs map[vanus.ID]*raftlog.Log) error { + blocks, err := block.RecoverBlocks(filepath.Join(s.volumeDir, "block")) + if err != nil { + return err + } + + // TODO: optimize this, because the implementation assumes under storage is linux file system + for blockID, path := range blocks { + b, err := block.OpenFileSegmentBlock(ctx, path) + if err != nil { + return err + } + + log.Info(ctx, "The block was loaded.", map[string]interface{}{ + "blockID": blockID, + }) + // TODO(james.yin): initialize block + if err = b.Initialize(ctx); err != nil { + return err + } + + s.blocks.Store(blockID, b) + + // recover replica + if b.IsAppendable() { + raftLog := raftLogs[blockID] + // raft log has been compacted + if raftLog == nil { + raftLog = raftlog.RecoverLog(blockID, s.wal, s.metaStore, s.offsetStore) + } + replica := s.makeReplicaWithRaftLog(context.TODO(), b, raftLog) + s.blockWriters.Store(b.SegmentBlockID(), replica) + } + s.blockReaders.Store(b.SegmentBlockID(), b) + } + + for nodeID, raftLog := range raftLogs { + b, ok := s.blocks.Load(nodeID) + if !ok { + // TODO(james.yin): no block for raft log, compact + log.Debug(ctx, "Not found block, so discard the raft log.", map[string]interface{}{ + "nodeID": nodeID, + }) + continue + } + if _, ok = b.(*block.Replica); !ok { + // TODO(james.yin): block is not appendable, compact + log.Debug(ctx, "Block is not appendable, so discard the raft log.", map[string]interface{}{ + "nodeID": nodeID, + }) + continue + } + _ = raftLog + } + + return nil +} diff --git a/internal/store/segment/segment.go b/internal/store/segment/segment.go index a32366948..32b5b44e0 100644 --- a/internal/store/segment/segment.go +++ b/internal/store/segment/segment.go @@ -15,38 +15,37 @@ package segment import ( - // standard libraries + // standard libraries. "context" "fmt" "os" "path/filepath" - "strconv" "strings" "sync" "time" - // third-party libraries + // third-party libraries. cepb "cloudevents.io/genproto/v1" "github.com/golang/protobuf/proto" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/emptypb" - // first-party libraries + // first-party libraries. ctrlpb "github.com/linkall-labs/vsproto/pkg/controller" metapb "github.com/linkall-labs/vsproto/pkg/meta" raftpb "github.com/linkall-labs/vsproto/pkg/raft" segpb "github.com/linkall-labs/vsproto/pkg/segment" - // this project + // this project. "github.com/linkall-labs/vanus/internal/primitive" "github.com/linkall-labs/vanus/internal/primitive/vanus" raftlog "github.com/linkall-labs/vanus/internal/raft/log" "github.com/linkall-labs/vanus/internal/raft/transport" "github.com/linkall-labs/vanus/internal/store" + "github.com/linkall-labs/vanus/internal/store/meta" "github.com/linkall-labs/vanus/internal/store/segment/block" "github.com/linkall-labs/vanus/internal/store/segment/errors" - walog "github.com/linkall-labs/vanus/internal/store/wal" "github.com/linkall-labs/vanus/internal/util" errutil "github.com/linkall-labs/vanus/internal/util/errors" "github.com/linkall-labs/vanus/observability" @@ -61,7 +60,9 @@ type segmentServer struct { blocks sync.Map blockWriters sync.Map blockReaders sync.Map - wal *walog.WAL + wal *raftlog.WAL + metaStore *meta.SyncStore + offsetStore *meta.AsyncStore resolver *transport.SimpleResolver host transport.Host @@ -72,7 +73,7 @@ type segmentServer struct { cfg store.Config localAddress string - volumeId vanus.ID + volumeID vanus.ID volumeDir string ctrlAddress []string @@ -99,7 +100,7 @@ func NewSegmentServer(cfg store.Config, stop func()) (segpb.SegmentServerServer, state: primitive.ServerStateCreated, cfg: cfg, localAddress: localAddress, - volumeId: cfg.Volume.ID, + volumeID: cfg.Volume.ID, volumeDir: cfg.Volume.Dir, resolver: resolver, host: host, @@ -112,23 +113,16 @@ func NewSegmentServer(cfg store.Config, stop func()) (segpb.SegmentServerServer, } func (s *segmentServer) Initialize(ctx context.Context) error { - // recover wal and raft log - raftLogs, wal, err := raftlog.RecoverLogsAndWAL(filepath.Join(s.volumeDir, "raft")) - if err != nil { - return err - } - s.wal = wal - - blockPeers, err := s.registerSelf(ctx) - if err != nil { + if err := s.recover(ctx); err != nil { return err } - if err = s.recoverBlocks(ctx, blockPeers, raftLogs); err != nil { + if err := s.registerSelf(ctx); err != nil { return err } s.state = primitive.ServerStateStarted + return nil } @@ -176,20 +170,29 @@ func (s *segmentServer) CreateBlock(ctx context.Context, observability.EntryMark(ctx) defer observability.LeaveMark(ctx) - if err := s.checkoutState(); err != nil { + if err := s.checkState(); err != nil { return nil, err } + if req.Id == 0 { + log.Warning(ctx, "Can not create block without id.", nil) + return nil, errors.ErrInvalidRequest.WithMessage("can not create block without id.") + } + + log.Info(ctx, "Create block.", map[string]interface{}{ + "blockID": req.Id, + "size": req.Size, + }) + blockID := vanus.NewIDFromUint64(req.Id) _, exist := s.blocks.Load(blockID) if exist { - return nil, errors.ErrResourceAlreadyExist.WithMessage("the segment has already exist") + return nil, errors.ErrResourceAlreadyExist.WithMessage("the segment has already exist.") } // create block - path := s.generateNewBlockPath(blockID) - b, err := block.CreateFileSegmentBlock(ctx, blockID, path, req.Size) + b, err := block.CreateFileSegmentBlock(ctx, filepath.Join(s.volumeDir, "block"), blockID, req.Size) if err != nil { return nil, err } @@ -209,7 +212,7 @@ func (s *segmentServer) RemoveBlock(ctx context.Context, observability.EntryMark(ctx) defer observability.LeaveMark(ctx) - if err := s.checkoutState(); err != nil { + if err := s.checkState(); err != nil { return nil, err } @@ -224,42 +227,62 @@ func (s *segmentServer) ActivateSegment(ctx context.Context, observability.EntryMark(ctx) defer observability.LeaveMark(ctx) - if err := s.checkoutState(); err != nil { + if err := s.checkState(); err != nil { return nil, err } - // bootstrap replica - if len(req.Replicas) > 0 { - var myID vanus.ID - var peers []block.IDAndEndpoint - for blockID, endpoint := range req.Replicas { - peer := vanus.NewIDFromUint64(blockID) - peers = append(peers, block.IDAndEndpoint{ - ID: peer, - Endpoint: endpoint, - }) - if endpoint == s.localAddress { - myID = peer - } else { - // register peer - s.resolver.Register(blockID, endpoint) - } - } + if len(req.Replicas) == 0 { + log.Warning(ctx, "Replicas can not be empty.", map[string]interface{}{ + "eventLogID": req.EventLogId, + "replicaGroupID": req.ReplicaGroupId, + }) + return &segpb.ActivateSegmentResponse{}, nil + } - if myID == 0 { - return nil, errors.ErrResourceNotFound.WithMessage("the segment doesn't exist") - } - v, exist := s.blockWriters.Load(myID) - if !exist { - return nil, errors.ErrResourceNotFound.WithMessage("the segment doesn't exist") - } + log.Info(ctx, "Activate segment.", map[string]interface{}{ + "eventLogID": req.EventLogId, + "replicaGroupID": req.ReplicaGroupId, + "replicas": req.Replicas, + }) - replica := v.(*block.Replica) - if err := replica.Bootstrap(peers); err != nil { - return nil, err + var myID vanus.ID + peers := make([]block.IDAndEndpoint, 0, len(req.Replicas)-1) + for blockID, endpoint := range req.Replicas { + peer := vanus.NewIDFromUint64(blockID) + peers = append(peers, block.IDAndEndpoint{ + ID: peer, + Endpoint: endpoint, + }) + if endpoint == s.localAddress { + myID = peer } } + if myID == 0 { + return nil, errors.ErrResourceNotFound.WithMessage("the segment doesn't exist") + } + v, exist := s.blockWriters.Load(myID) + if !exist { + return nil, errors.ErrResourceNotFound.WithMessage("the segment doesn't exist") + } + + // Register peers. + for i := range peers { + peer := &peers[i] + s.resolver.Register(peer.ID.Uint64(), peer.Endpoint) + } + + log.Info(ctx, "Bootstrap replica.", map[string]interface{}{ + "blockID": myID, + "peers": peers, + }) + + // Bootstrap replica. + replica, _ := v.(*block.Replica) + if err := replica.Bootstrap(peers); err != nil { + return nil, err + } + return &segpb.ActivateSegmentResponse{}, nil } @@ -269,7 +292,7 @@ func (s *segmentServer) InactivateSegment(ctx context.Context, observability.EntryMark(ctx) defer observability.LeaveMark(ctx) - if err := s.checkoutState(); err != nil { + if err := s.checkState(); err != nil { return nil, err } @@ -281,7 +304,7 @@ func (s *segmentServer) GetBlockInfo(ctx context.Context, observability.EntryMark(ctx) defer observability.LeaveMark(ctx) - if err := s.checkoutState(); err != nil { + if err := s.checkState(); err != nil { return nil, err } @@ -294,7 +317,7 @@ func (s *segmentServer) AppendToBlock(ctx context.Context, observability.EntryMark(ctx) defer observability.LeaveMark(ctx) - if err := s.checkoutState(); err != nil { + if err := s.checkState(); err != nil { return nil, err } @@ -344,7 +367,7 @@ func (s *segmentServer) ReadFromBlock(ctx context.Context, req *segpb.ReadFromBlockRequest) (*segpb.ReadFromBlockResponse, error) { observability.EntryMark(ctx) defer observability.LeaveMark(ctx) - if err := s.checkoutState(); err != nil { + if err := s.checkState(); err != nil { return nil, err } @@ -354,7 +377,7 @@ func (s *segmentServer) ReadFromBlock(ctx context.Context, return nil, errors.ErrResourceNotFound.WithMessage("the segment doesn't exist on this server") } - segBlock := segV.(block.SegmentBlock) + segBlock, _ := segV.(block.SegmentBlock) v, exist := s.blockReaders.Load(blockID) var reader block.SegmentBlockReader if !exist { @@ -365,7 +388,7 @@ func (s *segmentServer) ReadFromBlock(ctx context.Context, reader = _reader s.blockReaders.Store(blockID, reader) } else { - reader = v.(block.SegmentBlockReader) + reader, _ = v.(block.SegmentBlockReader) } entries, err := reader.Read(ctx, int(req.Offset), int(req.Number)) if err != nil { @@ -386,72 +409,54 @@ func (s *segmentServer) ReadFromBlock(ctx context.Context, }, nil } -func (s *segmentServer) startHeartbeatTask() error { +func (s *segmentServer) startHeartbeatTask(ctx context.Context) error { if s.isDebugMode { return nil } - go func() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - ctx := context.Background() - - LOOP: - for { - select { - case <-s.closeCh: - break LOOP - case <-ticker.C: - infos := make([]*metapb.SegmentHealthInfo, 0) - s.blocks.Range(func(key, value interface{}) bool { - infos = append(infos, value.(block.SegmentBlock).HealthInfo()) - return true - }) - req := &ctrlpb.SegmentHeartbeatRequest{ - ServerId: s.id.Uint64(), - VolumeId: s.volumeId.Uint64(), - HealthInfo: infos, - ReportTime: util.FormatTime(time.Now()), - ServerAddr: s.localAddress, - } - if err := s.cc.heartbeat(context.Background(), req); err != nil { - log.Warning(ctx, "send heartbeat to controller failed, connection lost. try to reconnecting", map[string]interface{}{ + go s.runHeartbeat(ctx) + return nil +} + +func (s *segmentServer) runHeartbeat(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-s.closeCh: + return + case <-ticker.C: + infos := make([]*metapb.SegmentHealthInfo, 0) + s.blocks.Range(func(key, value interface{}) bool { + b, _ := value.(block.SegmentBlock) + infos = append(infos, b.HealthInfo()) + return true + }) + req := &ctrlpb.SegmentHeartbeatRequest{ + ServerId: s.id.Uint64(), + VolumeId: s.volumeID.Uint64(), + HealthInfo: infos, + ReportTime: util.FormatTime(time.Now()), + ServerAddr: s.localAddress, + } + if err := s.cc.heartbeat(context.Background(), req); err != nil { + log.Warning( + ctx, + "Send heartbeat to controller failed, connection lost. try to reconnecting", + map[string]interface{}{ log.KeyError: err, }) - } } } - ticker.Stop() - }() - return nil -} - -func (s *segmentServer) generateNewBlockPath(id vanus.ID) string { - return filepath.Join(s.volumeDir, "block", id.String()) + } } func (s *segmentServer) start(ctx context.Context) error { - wg := sync.WaitGroup{} - - var err error - s.blocks.Range(func(key, value interface{}) bool { - wg.Add(1) - go func(segBlock block.SegmentBlock) { - if _err := segBlock.Initialize(ctx); err != nil { - err = errutil.Chain(err, _err) - } - // s.blockWriters.Store(segBlock.SegmentBlockID(), segBlock) - s.blockReaders.Store(segBlock.SegmentBlockID(), segBlock) - wg.Done() - }(value.(block.SegmentBlock)) - return true - }) - wg.Wait() - - if err := s.startHeartbeatTask(); err != nil { + log.Info(ctx, "Start SegmentServer.", nil) + if err := s.startHeartbeatTask(ctx); err != nil { return err } - return nil } @@ -464,9 +469,9 @@ func (s *segmentServer) stop(ctx context.Context) error { go func() { s.waitAllAppendRequestCompleted(ctx) s.blockWriters.Range(func(key, value interface{}) bool { - writer := value.(block.SegmentBlockWriter) - if _err := writer.CloseWrite(ctx); err != nil { - err = errutil.Chain(err, _err) + writer, _ := value.(block.SegmentBlockWriter) + if err2 := writer.CloseWrite(ctx); err2 != nil { + err = errutil.Chain(err, err2) } return true }) @@ -479,9 +484,9 @@ func (s *segmentServer) stop(ctx context.Context) error { go func() { s.waitAllReadRequestCompleted(ctx) s.blockReaders.Range(func(key, value interface{}) bool { - reader := value.(block.SegmentBlockReader) - if _err := reader.CloseRead(ctx); err != nil { - err = errutil.Chain(err, _err) + reader, _ := value.(block.SegmentBlockReader) + if err2 := reader.CloseRead(ctx); err2 != nil { + err = errutil.Chain(err, err2) } return true }) @@ -499,14 +504,15 @@ func (s *segmentServer) markSegmentIsFull(ctx context.Context, segId vanus.ID) e return fmt.Errorf("the SegmentBlock does not exist") } - if err := bl.(block.SegmentBlockWriter).CloseWrite(ctx); err != nil { + writer, _ := bl.(block.SegmentBlockWriter) + if err := writer.CloseWrite(ctx); err != nil { return err } // report to controller immediately _, err := s.cc.reportSegmentBlockIsFull(ctx, &ctrlpb.SegmentHeartbeatRequest{ ServerId: s.id.Uint64(), - VolumeId: s.volumeId.Uint64(), + VolumeId: s.volumeID.Uint64(), HealthInfo: []*metapb.SegmentHealthInfo{ bl.(block.SegmentBlock).HealthInfo(), }, @@ -519,18 +525,18 @@ func (s *segmentServer) waitAllAppendRequestCompleted(ctx context.Context) {} func (s *segmentServer) waitAllReadRequestCompleted(ctx context.Context) {} -func (s *segmentServer) registerSelf(ctx context.Context) (map[vanus.ID][]uint64, error) { +func (s *segmentServer) registerSelf(ctx context.Context) error { if strings.ToLower(os.Getenv(segmentServerDebugModeFlagENV)) == "true" { return s.registerSelfInDebug(ctx) } res, err := s.cc.registerSegmentServer(ctx, &ctrlpb.RegisterSegmentServerRequest{ Address: s.localAddress, - VolumeId: s.volumeId.Uint64(), + VolumeId: s.volumeID.Uint64(), Capacity: s.cfg.Volume.Capacity, }) if err != nil { - return nil, err + return err } s.id = vanus.NewIDFromUint64(res.ServerId) @@ -538,114 +544,61 @@ func (s *segmentServer) registerSelf(ctx context.Context) (map[vanus.ID][]uint64 // FIXME(james.yin): some blocks may not be bound to segment. // No block in the volume of this server. - if len(res.Segments) <= 0 { - return map[vanus.ID][]uint64{}, nil + if len(res.Segments) == 0 { + return nil } - blockPeers := make(map[vanus.ID][]uint64) for _, segmentpb := range res.Segments { - if len(segmentpb.Replicas) <= 0 { + if len(segmentpb.Replicas) == 0 { continue } var myID vanus.ID - peers := make([]uint64, 0, len(segmentpb.Replicas)) for blockID, blockpb := range segmentpb.Replicas { - peers = append(peers, blockID) // Don't use address to compare - if blockpb.VolumeID == s.volumeId.Uint64() { + if blockpb.VolumeID == s.volumeID.Uint64() { if myID != 0 { // FIXME(james.yin): multiple blocks of same segment in this server. } myID = vanus.NewIDFromUint64(blockID) - } else { - // register peer - s.resolver.Register(blockID, blockpb.Endpoint) } } if myID == 0 { // TODO(james.yin): no my block continue } - blockPeers[myID] = peers + // Register peers. + for blockID, blockpb := range segmentpb.Replicas { + if blockpb.Endpoint == "" { + if blockpb.VolumeID == s.volumeID.Uint64() { + blockpb.Endpoint = s.localAddress + } else { + log.Warning(ctx, "Block is offline.", map[string]interface{}{ + "blockID": blockID, + "volumeID": blockpb.VolumeID, + "segmentID": segmentpb.Id, + "eventlogID": segmentpb.EventLogId, + }) + continue + } + } + s.resolver.Register(blockID, blockpb.Endpoint) + } } - return blockPeers, nil + + return nil } -func (s *segmentServer) registerSelfInDebug(ctx context.Context) (map[vanus.ID][]uint64, error) { +func (s *segmentServer) registerSelfInDebug(ctx context.Context) error { log.Info(ctx, "the segment server debug mode enabled", nil) s.id = vanus.NewID() s.isDebugMode = true - files, err := filepath.Glob(filepath.Join(s.volumeDir, "block", "*")) - if err != nil { - return nil, err - } - - blockPeers := make(map[vanus.ID][]uint64) - for _, file := range files { - blockID, err := strconv.ParseUint(filepath.Base(file), 10, 64) - if err != nil { - return nil, err - } - // TODO(james.yin): multiple peers - blockPeers[vanus.NewIDFromUint64(blockID)] = []uint64{blockID} - } - return blockPeers, nil -} - -func (s *segmentServer) recoverBlocks(ctx context.Context, blockPeers map[vanus.ID][]uint64, raftLogs map[vanus.ID]*raftlog.Log) error { - blockDir := filepath.Join(s.volumeDir, "block") - - // Make sure the block directory exists. - if err := os.MkdirAll(blockDir, 0755); err != nil { - return err - } - - // TODO: optimize this, because the implementation assumes under storage is linux file system - for blockID, peers := range blockPeers { - blockPath := filepath.Join(blockDir, blockID.String()) - b, err := block.OpenFileSegmentBlock(ctx, blockPath) - if err != nil { - return err - } - log.Info(ctx, "the block was loaded", map[string]interface{}{ - "id": b.SegmentBlockID().String(), - }) - // TODO(james.yin): initialize block - - s.blocks.Store(blockID, b) - - // recover replica - if b.IsAppendable() { - raftLog := raftLogs[blockID] - // raft log has been compacted - if raftLog == nil { - raftLog = raftlog.NewLog(blockID, s.wal, peers) - } else { - // TODO(james.yin): set peers - } - replica := s.makeReplicaWithRaftLog(context.TODO(), b, raftLog) - s.blockWriters.Store(b.SegmentBlockID(), replica) - } - } - - for blockID, raftLog := range raftLogs { - b, ok := s.blocks.Load(blockID) - if !ok { - // TODO(james.yin): no block for raft log, compact - } - if _, ok = b.(*block.Replica); !ok { - // TODO(james.yin): block is not appendable, compact - } - _ = raftLog - } - return nil } func (s *segmentServer) makeReplica(ctx context.Context, b block.SegmentBlock) *block.Replica { - raftLog := raftlog.NewLog(b.SegmentBlockID(), s.wal, nil) + raftLog := raftlog.NewLog(b.SegmentBlockID(), s.wal, s.metaStore, s.offsetStore) return s.makeReplicaWithRaftLog(ctx, b, raftLog) } @@ -655,7 +608,7 @@ func (s *segmentServer) makeReplicaWithRaftLog(ctx context.Context, b block.Segm return replica } -func (s *segmentServer) checkoutState() error { +func (s *segmentServer) checkState() error { if s.state != primitive.ServerStateRunning { return errors.ErrServiceState.WithMessage(fmt.Sprintf( "the server isn't ready to work, current state:%s", s.state)) diff --git a/internal/store/wal/compaction.go b/internal/store/wal/compaction.go new file mode 100644 index 000000000..eb74840dd --- /dev/null +++ b/internal/store/wal/compaction.go @@ -0,0 +1,19 @@ +// Copyright 2022 Linkall Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +func (w *WAL) Compact(off int64) error { + return nil +} diff --git a/internal/store/wal/stream.go b/internal/store/wal/stream.go index df82b75b2..48e4267c2 100644 --- a/internal/store/wal/stream.go +++ b/internal/store/wal/stream.go @@ -110,7 +110,7 @@ func (s *logStream) Visit(visitor WalkFunc, compacted int64) (int64, error) { return -1, err } - for so := 0; so <= blockSize-record.HeaderSize; { + for so := firstRecordOffset(f.so, compacted); so <= blockSize-record.HeaderSize; { r, err2 := record.Unmashal(buf[so:]) if err2 != nil { // TODO(james.yin): handle parse error @@ -197,3 +197,10 @@ func firstBlockOffset(so, compacted int64) int64 { } return 0 } + +func firstRecordOffset(so, compacted int64) int { + if so < compacted { + return int(compacted % blockSize) + } + return 0 +}