棋牌游戏平台:Golang 实现 Redis(4): AOF {持久化与}AOF重写

admin 6个月前 (04-09) 科技 50 0

‘本文是使用’ golang (实现) redis 『系列的第四篇文章』,将先容《若》何使用 golang (实现) Append Only File 持久化及 AOF ‘【文件】’重写。

本文完整源代码 在[作者GithubHDT3213/godis

AOF ‘【文件】’

AOF 『持久化是典型的异步义务』,「主协程」(goroutine) 「可以使用」 channel ‘将数据发送到异步协程由’异步协程执行持久化【操作】。

在[ DB 『中界说相关字段』:

type DB struct {
    // 主线程使用此channel将要持久化的【下令发送到】异步协程
    aofChan     chan *reply.MultiBulkReply 
    // append file ‘【文件】’描述符
    aofFile     *os.File  
    // append file 路径
	aofFilename string 

    // aof 重写需要的缓冲区,“将 在[”AOF‘重’写一节详细先容
    aofRewriteChan chan *reply.MultiBulkReply 
    //  在[需要的时刻使用此字段{《暂停》}持久化【操作】
	pausingAof     sync.RWMutex 
}

在[举行持久化时需要注重两个细节:

  1. get 之类的读下令并不需要举行持久化
  2. expire 【下令要用等效的】 expireat 下令替换。{举例说明},10:00 执行 expire a 3600 示意键 a 在[ 11:00 ‘过时’, 在[ 10:30 载入AOF‘【文件】’时执行 expire a 3600 就成了 11:30 ‘过时’与原数据不符。

我们 在[下令处置方式中返回 AOF 需要的分外信息:

type extra struct {
    // 【示意该下令是】否需要持久化
    toPersist  bool 
    // 『如上文所述』 expire {之类的下令不能直接持久化}
    // 《若》 specialAof == nil (则将下令原样持久化),否则持久化 specialAof 中的指令
	specialAof []*reply.MultiBulkReply 
}

type CmdFunc func(db *DB, args [][]byte) (redis.Reply, *extra)

以 SET {下令为}例:

func Set(db *DB, args [][]byte) (redis.Reply, *extra) {
    //....
    var result int
    switch policy {
    case upsertPolicy:
        result = db.Put(key, entity)
    case insertPolicy:
        result = db.PutIfAbsent(key, entity)
    case updatePolicy:
        result = db.PutIfExists(key, entity)
    }
    extra := &extra{toPersist: result > 0} // 《若》现实“〖(写入)〗”了数据则toPresist=true, 《若》〖由于〗XX『或』NX(选项没有现实“〖(写入)〗”数)据则toPresist=false
    if result > 0 {
        if ttl != unlimitedTTL { // 使用了 EX 『或』 NX 选项
            expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond)
            db.Expire(key, expireTime)
            // 《持久化时使用》 set key value 和 pexpireat 『下令取代』 set key value EX ttl 下令
            extra.specialAof = []*reply.MultiBulkReply{ 
                reply.MakeMultiBulkReply([][]byte{
                    []byte("SET"),
                    args[0],
                    args[1],
                }),
                makeExpireCmd(key, expireTime),
            }
        } else {
            db.Persist(key) // override ttl
        }
    }
    return &reply.OkReply{}, extra
}

var pExpireAtCmd = []byte("PEXPIREAT")

func makeExpireCmd(key string, expireAt time.Time) *reply.MultiBulkReply {
	args := make([][]byte, 3)
	args[0] = pExpireAtCmd
	args[1] = []byte(key)
	args[2] = []byte(strconv.FormatInt(expireAt.UnixNano()/1e6, 10))
	return reply.MakeMultiBulkReply(args)
}

在[处置下令的调剂方式中将 aof 【下令发送到】 channel:

func (db *DB) Exec(c redis.Client, args [][]byte) (result redis.Reply) {
	// ....
	// normal commands
	var extra *extra
	cmdFunc, ok := router[cmd] // 找到下令对应的处置函数
	if !ok {
		return reply.MakeErrReply("ERR unknown command '" + cmd + "'")
    }
    // 使用处置函数执行下令
	if len(args) > 1 {
		result, extra = cmdFunc(db, args[1:])
	} else {
		result, extra = cmdFunc(db, [][]byte{})
	}

	// AOF 持久化
	if config.Properties.AppendOnly {
		if extra != nil && extra.toPersist {
            // “〖(写入)〗” specialAof
			if extra.specialAof != nil && len(extra.specialAof) > 0 {
				for _, r := range extra.specialAof {
					db.addAof(r)
				}
			} else {
                // “〖(写入)〗”原始下令
				r := reply.MakeMultiBulkReply(args)
				db.addAof(r)
			}
		}
	}
	return
}

在[异步协程中“〖(写入)〗”下令:

func (db *DB) handleAof() {
	for cmd := range db.aofChan {
        // 异步协程 在[持久化之前会实验获取锁,《若》其他协程持有锁则会{《暂停》}持久化【操作】
        // 锁也保证了每次“〖(写入)〗”完整的一条指令不会花样错误
		db.pausingAof.RLock() 
		if db.aofRewriteChan != nil {
			db.aofRewriteChan <- cmd
		}
		_, err := db.aofFile.Write(cmd.ToBytes())
		if err != nil {
			logger.Warn(err)
		}
		db.pausingAof.RUnlock()
	}
}

“读取历程与协议解”析器一节基本相同,不 在[正文中赘述:loadAof。

AOF 重写

《若》我们对键a赋值100次会 在[AOF‘【文件】’中发生100(条指令但只有最后一)条指令是有用的,为了削减持久化‘【文件】’的巨细需要举行AOF(重写以删除无用的指令)。

重写必须 在[牢固稳定的数据集上举行,「不」能直接使用内存中的数据。Redis 重写的(实现)方式是举行 fork 并 在[子历程中遍历数据库内的数据重新天生AOF‘【文件】’。〖由于〗 golang 不支持 fork 【操作】,我们只能接纳读取AOF‘【文件】’天生副本的方式来取代fork。

在[举行AOF重写【操作】时需要知足两个要求:

  1. 《若》 AOF 重写失败『或』被中止,AOF ‘【文件】’需保持重写之前的状态不能丢失数据
  2. 举行 AOF 《重写时代执行的下令必须保存到新的》AOF‘【文件】’中, 不能丢失

因此我们设计了一套比较复杂的流程:

  1. {《暂停》}AOF“〖(写入)〗” -> 更改状态为重写中 -> ‘复制当前’AOF‘【文件】’ -> 恢复AOF“〖(写入)〗”
  2. 在[重写历程中,持久化协程 在[将下令“〖(写入)〗”‘【文件】’的同时也将其“〖(写入)〗”内存中的重写缓存区
  3. 重写协程读取AOF副本并将重写到临时‘【文件】’(tmp.aof)中
  4. {《暂停》}AOF“〖(写入)〗” -> 将重写缓冲区中的下令“〖(写入)〗”tmp.aof -> 使用临时‘【文件】’tmp.aof《笼罩》AOF‘【文件】’(使用‘【文件】’系统的mv『下令保证平安』)-> 【清空重写缓冲区】 -> 恢复AOF“〖(写入)〗”

在[不壅闭 在[线〖服务〗的同时举行其它【操作】是一项必须的能力,AOF重写的思绪 在[解决这类问题时具有主要的参考价值。<好比>Mysql Online DDL: gh-ost“接纳了类似的计谋保证数据一致”。

首先准备最先重写【操作】:

func (db *DB) startRewrite() (*os.File, error) {
    // {《暂停》}AOF“〖(写入)〗”, 数据会 在[ db.aofChan 中暂时聚积
	db.pausingAof.Lock() 
	defer db.pausingAof.Unlock()

	// 建立重写缓冲区
	db.aofRewriteChan = make(chan *reply.MultiBulkReply, aofQueueSize)

	// 建立临时‘【文件】’
	file, err := ioutil.TempFile("", "aof")
	if err != nil {
		logger.Warn("tmp file create failed")
		return nil, err
	}
	return file, nil
}

在[重写历程中,「持久化协程举行双写」:

func (db *DB) handleAof() {
	for cmd := range db.aofChan {
		db.pausingAof.RLock() 
		if db.aofRewriteChan != nil {
            // 数据“〖(写入)〗”重写缓冲区
			db.aofRewriteChan <- cmd
		}
		_, err := db.aofFile.Write(cmd.ToBytes())
		if err != nil {
			logger.Warn(err)
		}
		db.pausingAof.RUnlock()
	}
}

执行重写:

func (db *DB) aofRewrite() {
	file, err := db.startRewrite()
	if err != nil {
		logger.Warn(err)
		return
	}

	// load aof file
	tmpDB := &DB{
		Data:     dict.MakeSimple(),
		TTLMap:   dict.MakeSimple(),
		Locker:   lock.Make(lockerSize),
		interval: 5 * time.Second,

		aofFilename: db.aofFilename,
	}
	tmpDB.loadAof()

	// rewrite aof file
	tmpDB.Data.ForEach(func(key string, raw interface{}) bool {
		var cmd *reply.MultiBulkReply
		entity, _ := raw.(*DataEntity)
		switch val := entity.Data.(type) {
		case []byte:
			cmd = persistString(key, val)
		case *List.LinkedList:
			cmd = persistList(key, val)
		case *set.Set:
			cmd = persistSet(key, val)
		case dict.Dict:
			cmd = persistHash(key, val)
		case *SortedSet.SortedSet:
			cmd = persistZSet(key, val)

		}
		if cmd != nil {
			_, _ = file.Write(cmd.ToBytes())
		}
		return true
	})
	tmpDB.TTLMap.ForEach(func(key string, raw interface{}) bool {
		expireTime, _ := raw.(time.Time)
		cmd := makeExpireCmd(key, expireTime)
		if cmd != nil {
			_, _ = file.Write(cmd.ToBytes())
		}
		return true
	})

	db.finishRewrite(file)
}

重写完毕后“〖(写入)〗”缓冲区中的数据并替换正式‘【文件】’:

func (db *DB) finishRewrite(tmpFile *os.File) {
    // {《暂停》}AOF“〖(写入)〗”
	db.pausingAof.Lock() 
	defer db.pausingAof.Unlock()


    // 将重写缓冲区内的数据“〖(写入)〗”临时‘【文件】’
	// 〖由于〗handleAof已被{《暂停》}, 在[遍历时代aofRewriteChan中不会有新数据
    loop:
	for {
		select {
		case cmd := <-db.aofRewriteChan:
			_, err := tmpFile.Write(cmd.ToBytes())
			if err != nil {
				logger.Warn(err)
			}
		default:
			// 只有 channel (为空时才会进入此分支)
			break loop
		}
    }
    // {释放重写缓冲区}
	close(db.aofRewriteChan)
	db.aofRewriteChan = nil

	// 使用临时‘【文件】’取代aof‘【文件】’
	_ = db.aofFile.Close()
	_ = os.Rename(tmpFile.Name(), db.aofFilename)

	// 重新打开‘【文件】’描述符以保证正常“〖(写入)〗”
	aofFile, err := os.OpenFile(db.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
	if err != nil {
		panic(err)
	}
	db.aofFile = aofFile
}
,

诚信 在[线手机版

诚信 在[线(现:阳光 在[线官网)现已开放诚信 在[线手机版、诚信 在[线电脑客户端下载。诚信 在[线娱乐游戏公平、公开、【公正】,用实力赢取信誉。

申博声明:该文看法仅代表作者自己,与本平台无关。转载请注明:棋牌游戏平台:Golang 实现 Redis(4): AOF {持久化与}AOF重写

网友评论

  • (*)

最新评论