Add: tmpfs lock and fix bug

This commit is contained in:
2021-12-14 01:19:50 +08:00
parent 922b2370ee
commit 47b178ac90
2 changed files with 38 additions and 6 deletions

View File

@@ -142,8 +142,10 @@ func (api *API) HandlePrepareFileStreamDirect(w http.ResponseWriter, r *http.Req
json.NewEncoder(w).Encode(prepareFileStreamDirectResponse) json.NewEncoder(w).Encode(prepareFileStreamDirectResponse)
return return
} }
// lock the object
api.Tmpfs.Lock(objPath)
api.Tmpfs.Record(objPath)
args := strings.Split(ffmpegConfig.Args, " ") args := strings.Split(ffmpegConfig.Args, " ")
startArgs := []string{"-threads", strconv.FormatInt(api.APIConfig.FfmpegThreads, 10), "-i", srcPath} startArgs := []string{"-threads", strconv.FormatInt(api.APIConfig.FfmpegThreads, 10), "-i", srcPath}
endArgs := []string{"-vn", "-y", objPath} endArgs := []string{"-vn", "-y", objPath}
@@ -156,6 +158,9 @@ func (api *API) HandlePrepareFileStreamDirect(w http.ResponseWriter, r *http.Req
return return
} }
api.Tmpfs.Record(objPath)
api.Tmpfs.Unlock(objPath)
fileInfo, err := os.Stat(objPath) fileInfo, err := os.Stat(objPath)
if err != nil { if err != nil {
api.HandleError(w, r, err) api.HandleError(w, r, err)

View File

@@ -13,12 +13,28 @@ type Tmpfs struct {
record map[string]int64 record map[string]int64
Config TmpfsConfig Config TmpfsConfig
wg sync.WaitGroup wg sync.WaitGroup
recordLocks map[string]*sync.Mutex
} }
func (tmpfs *Tmpfs) GetObjFilePath(id int64, configName string) (string) { func (tmpfs *Tmpfs) GetObjFilePath(id int64, configName string) (string) {
return filepath.Join(tmpfs.Config.Root, strconv.FormatInt(id, 10) + "." + configName + ".ogg") return filepath.Join(tmpfs.Config.Root, strconv.FormatInt(id, 10) + "." + configName + ".ogg")
} }
func (tmpfs *Tmpfs) GetLock(filename string) *sync.Mutex {
if _, ok := tmpfs.recordLocks[filename]; !ok {
tmpfs.recordLocks[filename] = &sync.Mutex{}
}
return tmpfs.recordLocks[filename]
}
func (tmpfs *Tmpfs) Lock(filename string) {
tmpfs.GetLock(filename).Lock()
}
func (tmpfs *Tmpfs) Unlock(filename string) {
tmpfs.GetLock(filename).Unlock()
}
type TmpfsConfig struct { type TmpfsConfig struct {
FileLifeTime int64 `json:"file_life_time"` FileLifeTime int64 `json:"file_life_time"`
CleanerInternal int64 `json:"cleaner_internal"` CleanerInternal int64 `json:"cleaner_internal"`
@@ -34,6 +50,7 @@ func NewTmpfs(config TmpfsConfig) *Tmpfs {
tmpfs := &Tmpfs{ tmpfs := &Tmpfs{
record: make(map[string]int64), record: make(map[string]int64),
Config: config, Config: config,
recordLocks: make(map[string]*sync.Mutex),
} }
tmpfs.wg.Add(1) tmpfs.wg.Add(1)
go tmpfs.Cleaner() go tmpfs.Cleaner()
@@ -53,15 +70,25 @@ func (tmpfs *Tmpfs) Cleaner() {
var err error var err error
for { for {
now := time.Now().Unix() now := time.Now().Unix()
for key, value := range tmpfs.record { for path, lock := range tmpfs.recordLocks {
if now - value > tmpfs.Config.FileLifeTime { lock.Lock()
err = os.Remove(key) recordTime, ok := tmpfs.record[path]
if !ok {
lock.Unlock()
continue
}
if now - recordTime > tmpfs.Config.FileLifeTime {
err = os.Remove(path)
if err != nil { if err != nil {
log.Println("[tmpfs] Failed to remove file", err) log.Println("[tmpfs] Failed to remove file", err)
lock.Unlock()
continue
} }
log.Println("[tmpfs] Deleted file", key) log.Println("[tmpfs] Deleted file", path)
delete(tmpfs.record, key) delete(tmpfs.record, path)
delete(tmpfs.recordLocks, path)
} }
lock.Unlock()
} }
time.Sleep(time.Second) time.Sleep(time.Second)