From 47b178ac90ec34cf279291a3e403bcc08a3dac70 Mon Sep 17 00:00:00 2001 From: heimoshuiyu Date: Tue, 14 Dec 2021 01:19:50 +0800 Subject: [PATCH] Add: tmpfs lock and fix bug --- pkg/api/handle_stream.go | 7 ++++++- pkg/tmpfs/tmpfs.go | 37 ++++++++++++++++++++++++++++++++----- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/pkg/api/handle_stream.go b/pkg/api/handle_stream.go index 5f34ee5..b59baeb 100644 --- a/pkg/api/handle_stream.go +++ b/pkg/api/handle_stream.go @@ -142,8 +142,10 @@ func (api *API) HandlePrepareFileStreamDirect(w http.ResponseWriter, r *http.Req json.NewEncoder(w).Encode(prepareFileStreamDirectResponse) return } + + // lock the object + api.Tmpfs.Lock(objPath) - api.Tmpfs.Record(objPath) args := strings.Split(ffmpegConfig.Args, " ") startArgs := []string{"-threads", strconv.FormatInt(api.APIConfig.FfmpegThreads, 10), "-i", srcPath} endArgs := []string{"-vn", "-y", objPath} @@ -156,6 +158,9 @@ func (api *API) HandlePrepareFileStreamDirect(w http.ResponseWriter, r *http.Req return } + api.Tmpfs.Record(objPath) + api.Tmpfs.Unlock(objPath) + fileInfo, err := os.Stat(objPath) if err != nil { api.HandleError(w, r, err) diff --git a/pkg/tmpfs/tmpfs.go b/pkg/tmpfs/tmpfs.go index 287c68e..671b4e9 100644 --- a/pkg/tmpfs/tmpfs.go +++ b/pkg/tmpfs/tmpfs.go @@ -13,12 +13,28 @@ type Tmpfs struct { record map[string]int64 Config TmpfsConfig wg sync.WaitGroup + recordLocks map[string]*sync.Mutex } func (tmpfs *Tmpfs) GetObjFilePath(id int64, configName string) (string) { return filepath.Join(tmpfs.Config.Root, strconv.FormatInt(id, 10) + "." + configName + ".ogg") } +func (tmpfs *Tmpfs) GetLock(filename string) *sync.Mutex { + if _, ok := tmpfs.recordLocks[filename]; !ok { + tmpfs.recordLocks[filename] = &sync.Mutex{} + } + return tmpfs.recordLocks[filename] +} + +func (tmpfs *Tmpfs) Lock(filename string) { + tmpfs.GetLock(filename).Lock() +} + +func (tmpfs *Tmpfs) Unlock(filename string) { + tmpfs.GetLock(filename).Unlock() +} + type TmpfsConfig struct { FileLifeTime int64 `json:"file_life_time"` CleanerInternal int64 `json:"cleaner_internal"` @@ -34,6 +50,7 @@ func NewTmpfs(config TmpfsConfig) *Tmpfs { tmpfs := &Tmpfs{ record: make(map[string]int64), Config: config, + recordLocks: make(map[string]*sync.Mutex), } tmpfs.wg.Add(1) go tmpfs.Cleaner() @@ -53,15 +70,25 @@ func (tmpfs *Tmpfs) Cleaner() { var err error for { now := time.Now().Unix() - for key, value := range tmpfs.record { - if now - value > tmpfs.Config.FileLifeTime { - err = os.Remove(key) + for path, lock := range tmpfs.recordLocks { + lock.Lock() + recordTime, ok := tmpfs.record[path] + if !ok { + lock.Unlock() + continue + } + if now - recordTime > tmpfs.Config.FileLifeTime { + err = os.Remove(path) if err != nil { log.Println("[tmpfs] Failed to remove file", err) + lock.Unlock() + continue } - log.Println("[tmpfs] Deleted file", key) - delete(tmpfs.record, key) + log.Println("[tmpfs] Deleted file", path) + delete(tmpfs.record, path) + delete(tmpfs.recordLocks, path) } + lock.Unlock() } time.Sleep(time.Second)