From 810b2e15b19b89436c5d2ad97764a094d08813a0 Mon Sep 17 00:00:00 2001 From: Quan Date: Thu, 24 Nov 2022 19:54:08 +0800 Subject: [PATCH] =?UTF-8?q?update:=20=E5=AE=9E=E7=8E=B0aof=E9=87=8D?= =?UTF-8?q?=E5=86=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 5 +- .idea/{Godis.iml => GoKV.iml} | 0 .idea/modules.xml | 2 +- README.md | 5 +- aof/marshal.go | 17 ++++ aof/rewrite.go | 182 ++++++++++++++++++++++++++++++++++ appendonly.aof | 110 +++++++++++++++++++- database/database.go | 29 +++++- database/single_db.go | 18 ++++ interface/database/db.go | 3 +- 10 files changed, 362 insertions(+), 9 deletions(-) rename .idea/{Godis.iml => GoKV.iml} (100%) create mode 100644 aof/rewrite.go diff --git a/.gitignore b/.gitignore index becf36a..0b28c09 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ # 忽略logs logs -*.log \ No newline at end of file +*.log + +aof +*.aof \ No newline at end of file diff --git a/.idea/Godis.iml b/.idea/GoKV.iml similarity index 100% rename from .idea/Godis.iml rename to .idea/GoKV.iml diff --git a/.idea/modules.xml b/.idea/modules.xml index de9d3fe..fc138f7 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -2,7 +2,7 @@ - + \ No newline at end of file diff --git a/README.md b/README.md index 95df8d5..04fcc3a 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,12 @@ # 04_support_redis_command 分支 ## 简介 该分支主要实现各种redis所支持的命令,同时实现各种redis场景的数据结构 -同时完善godis的持久化功能,包括aof、rdb +同时完善GoKV的持久化功能,包括aof、rdb ## 时间记录 - 开始:22.11.17 ## 实现记录 1. 22.11.18:String数据结构,实现基本的get/set命令 -2. 22.11.21 ~ 22.11.23:实现aof写入与读取 \ No newline at end of file +2. 22.11.21 ~ 22.11.23:实现aof写入与读取 +3. 22.11.24:实现aof重写功能 \ No newline at end of file diff --git a/aof/marshal.go b/aof/marshal.go index 70ada62..0855920 100644 --- a/aof/marshal.go +++ b/aof/marshal.go @@ -1,11 +1,28 @@ package aof import ( + "Godis/interface/database" "Godis/redis/protocol" "strconv" "time" ) +// EntityToCmd 序列化一个数据库实例为redis命令 +func EntityToCmd(key string, entity *database.DataEntity) *protocol.MultiBulkReply { + if entity == nil { + return nil + } + + var cmd *protocol.MultiBulkReply + switch val := entity.Data.(type) { + case []byte: + cmd = stringToCmd(key, val) + // TODO 支持更多格式 + } + + return cmd +} + // Set 命令 var setCmd = []byte("SET") diff --git a/aof/rewrite.go b/aof/rewrite.go new file mode 100644 index 0000000..3e6aa4c --- /dev/null +++ b/aof/rewrite.go @@ -0,0 +1,182 @@ +package aof + +import ( + "Godis/config" + "Godis/interface/database" + "Godis/lib/logger" + "Godis/lib/utils" + "Godis/redis/protocol" + "io" + "os" + "strconv" + "time" +) + +/* + AOF 重写 +*/ + +// RewriteCtx 重写上下文 +type RewriteCtx struct { + tmpFile *os.File // 存储aof的临时文件 + fileSize int64 + dbIndex int +} + +func (handler *Handler) newRewriteHandler() *Handler { + h := &Handler{} + h.aofFileName = handler.aofFileName + h.db = handler.tmpDBMaker() + return h +} + +// Rewrite 重写方法的上层接口 +func (handler *Handler) Rewrite() error { + ctx, err := handler.StartRewrite() + if err != nil { + return nil + } + err = handler.DoRewrite(ctx) + if err != nil { + return nil + } + handler.FinishRewrite(ctx) + return nil +} + +// StartRewrite 为重写的开始做准备 +func (handler *Handler) StartRewrite() (*RewriteCtx, error) { + // 1. 暂时停止重写操作,关闭aofChan。此时新数据会堆积在aofChan上 + handler.pausingAof.Lock() + defer handler.pausingAof.Unlock() + + // 2. 调用fsync是缓冲区的数据落盘,防止aof数据不完整 + err := handler.aofFile.Sync() + if err != nil { + logger.Warn("fsync failed") + return nil, err + } + + // 3. 获取当前aof文件消息,以备之后判断哪些数据是重写过程中产生的新数据 + fileInfo, err := os.Stat(handler.aofFileName) + if err != nil { + logger.Warn("get file stat failed") + // return nil, err // 暂时略过错误,继续执行 + } + fileSize := fileInfo.Size() + + // 4. 创建aof文件的临时副本 + // tmpFile, err := os.CreateTemp("", "*.aof") // ERROR: 直接创建temp文件在C盘中,而不是项目路径内 + tmpFile, err := os.Create("tmpAofFile.aof") + if err != nil { + logger.Warn("create temp file failed") + return nil, err + } + + return &RewriteCtx{ + tmpFile: tmpFile, + fileSize: fileSize, + dbIndex: 0, + }, nil +} + +// DoRewrite 执行aof重写 +func (handler *Handler) DoRewrite(ctx *RewriteCtx) error { + tmpFile := ctx.tmpFile + + // 1. 加载旧的aof数据 + tmpAof := handler.newRewriteHandler() + tmpAof.LoadAOF(int(ctx.fileSize)) // TODO 打断点调试! + + // 2. 执行重写操作 + for i := 0; i < config.Properties.Databases; i++ { + // select db:遍历每一个db,备份数据 + data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(i))).ToBytes() + _, err := tmpFile.Write(data) + if err != nil { + return err + } + + // 定义写入aof的匿名函数 + writeDataToAof := func(key string, entity *database.DataEntity, expiration *time.Time) bool { + // 遍历到的每一个数据,都对其执行func方法中的操作 ————> golang的函数式编程 + cmd := EntityToCmd(key, entity) + if cmd != nil { + _, _ = tmpFile.Write(cmd.ToBytes()) + } + if expiration != nil { + cmd = MakeExpireCmd(key, *expiration) + if cmd != nil { + _, _ = tmpFile.Write(cmd.ToBytes()) + } + } + return true + } + + handler.db.ForEach(i, writeDataToAof) + } + + return nil +} + +// FinishRewrite 结束aof重写 +func (handler *Handler) FinishRewrite(ctx *RewriteCtx) { + // 1. 暂停aof写入 + handler.pausingAof.Lock() + defer handler.pausingAof.Unlock() + + // 2. 操作旧的aof文件,将文件指针指向重写开始前的位置 + oldAof, err := os.Open(handler.aofFileName) // 只读模式启动 + if err != nil { + logger.Error("open aofFilename failed: " + err.Error()) + return + } + //defer func() { + // // 不要忘记关闭资源!!! + // _ = oldAof.Close() + //}() + + // 文件指针指向重写开启前的位置 + _, err = oldAof.Seek(ctx.fileSize, 0) + if err != nil { + logger.Error("oldAof seek failed: " + err.Error()) + return + } + + // 3. 在tmpAof文件末尾插入一条Select语句,确保tmpAof末尾的数据库选择,与重写开始后新数据写入的选择一致,保证新的aof文件连贯性 + tmpAof := ctx.tmpFile + data := protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(ctx.dbIndex))).ToBytes() + _, err = tmpAof.Write(data) + if err != nil { + logger.Error("tmpAof rewrite failed: " + err.Error()) + return + } + + // 4. 将新数据拷贝到tmpAof文件中 + _, err = io.Copy(tmpAof, oldAof) + if err != nil { + logger.Error("copy aof to tmpAof failed: " + err.Error()) + return + } + + // 5. 替换掉原有的aof文件 + _ = handler.aofFile.Close() // 先关闭原有文件,再做更新处理 + _ = oldAof.Close() + _ = tmpAof.Close() + _ = os.Rename(tmpAof.Name(), handler.aofFileName) + + // 6. 重新打开aofFile,并更新到handler中 + aofFile, err := os.OpenFile(handler.aofFileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) + if err != nil { + // 此处如果打不开,那么程序必须崩溃,因为aof文件已经被修改了,出现错误无法补救 + panic(err) + } + handler.aofFile = aofFile // 更新 + + // 7. 数据库同步。在aof文件中追加一条select,确保与当前currentDB一致 + data = protocol.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(handler.currentDB))).ToBytes() + _, err = handler.aofFile.Write(data) + if err != nil { + panic(err) + } +} diff --git a/appendonly.aof b/appendonly.aof index 99cc04d..cea5fdc 100644 --- a/appendonly.aof +++ b/appendonly.aof @@ -1,7 +1,111 @@ +*2 +$6 +SELECT +$1 +0 *3 $3 -set -$4 -quan +SET +$5 +ccone +$3 +c23 +*3 +$3 +SET $4 quan +$31 +eiofjiwojgiojweiofjweiojfiowejf +*3 +$3 +SET +$8 +quanquan +$8 +quanquan +*2 +$6 +SELECT +$1 +1 +*2 +$6 +SELECT +$1 +2 +*2 +$6 +SELECT +$1 +3 +*2 +$6 +SELECT +$1 +4 +*2 +$6 +SELECT +$1 +5 +*2 +$6 +SELECT +$1 +6 +*2 +$6 +SELECT +$1 +7 +*2 +$6 +SELECT +$1 +8 +*2 +$6 +SELECT +$1 +9 +*2 +$6 +SELECT +$2 +10 +*2 +$6 +SELECT +$2 +11 +*2 +$6 +SELECT +$2 +12 +*2 +$6 +SELECT +$2 +13 +*2 +$6 +SELECT +$2 +14 +*2 +$6 +SELECT +$2 +15 +*2 +$6 +SELECT +$1 +0 +*2 +$6 +SELECT +$1 +0 diff --git a/database/database.go b/database/database.go index 366812a..2e3ecf5 100644 --- a/database/database.go +++ b/database/database.go @@ -11,6 +11,7 @@ import ( "runtime/debug" "strings" "sync/atomic" + "time" ) // MultiDB is a set of multiple database set @@ -53,7 +54,10 @@ func (mdb *MultiDB) Exec(client redis.Connection, cmdLine [][]byte) (result redi // TODO 2. 集群命令 - // TODO 3. 无法在集群模式下执行的特殊命令 + // 3. 无法在集群模式下执行的特殊命令 + if cmdName == "rewriteaof" { + return RewriteAOF(mdb, cmdLine[1:]) + } // 4. 普通命令 dbIndex := client.GetDBIndex() @@ -150,3 +154,26 @@ func MakeBasicMultiDB() database.EmbedDB { } return mdb } + +// ForEach traverses all the keys in the given database +func (mdb *MultiDB) ForEach(dbIndex int, cb func(key string, data *database.DataEntity, expiration *time.Time) bool) { + mdb.mustSelectDB(dbIndex).ForEach(cb) +} + +// mustSelectDB 必须成功选择一个db,否则panic +func (mdb *MultiDB) mustSelectDB(index int) *DB { + selectDB, err := mdb.SelectDB(index) + if err != nil { + panic(err) + } + return selectDB +} + +// RewriteAOF 启动aof重写 +func RewriteAOF(db *MultiDB, args [][]byte) redis.Reply { + err := db.aofHandler.Rewrite() + if err != nil { + return protocol.MakeErrReply(err.Error()) + } + return protocol.MakeOkReply() +} diff --git a/database/single_db.go b/database/single_db.go index f171b92..1cbe31a 100644 --- a/database/single_db.go +++ b/database/single_db.go @@ -210,6 +210,24 @@ func (db *DB) Remove(key string) { // TODO 原子事务实现 } +// ForEach 遍历数据库,并将数据使用cb函数处理 +func (db *DB) ForEach(cb func(key string, data *database.DataEntity, expiration *time.Time) bool) { + // 调用data自身的foreach遍历 + db.data.ForEach(func(key string, raw interface{}) bool { + // 将遍历出来的interface转换为DataEntity + entity, _ := raw.(*database.DataEntity) + // 获取ttlMap中的过期时间 + var expirationTime *time.Time + rawExpiredTime, ok := db.ttlMap.Get(key) + if ok { + expireTime := rawExpiredTime.(time.Time) + expirationTime = &expireTime + } + // 将遍历出来的数据,使用cb函数处理 + return cb(key, entity, expirationTime) + }) +} + /* ------- redis键值对版本控制 --------- */ // addVersion 版本号自增 diff --git a/interface/database/db.go b/interface/database/db.go index 5970a05..bac9f5b 100644 --- a/interface/database/db.go +++ b/interface/database/db.go @@ -2,6 +2,7 @@ package database import ( "Godis/interface/redis" + "time" ) // CmdLine is alias for [][]byte, represents a command line @@ -20,7 +21,7 @@ type EmbedDB interface { //ExecWithLock(conn redis.Connection, cmdLine [][]byte) redis.Reply //ExecMulti(conn redis.Connection, watching map[string]uint32, cmdLines []CmdLine) redis.Reply //GetUndoLogs(dbIndex int, cmdLine [][]byte) []CmdLine - //ForEach(dbIndex int, cb func(key string, data *DataEntity, expiration *time.Time) bool) + ForEach(dbIndex int, cb func(key string, data *DataEntity, expiration *time.Time) bool) //RWLocks(dbIndex int, writeKeys []string, readKeys []string) //RWUnLocks(dbIndex int, writeKeys []string, readKeys []string) //GetDBSize(dbIndex int) (int, int)