From c9a2d2f9ce3fd7b9c724d8d2079abf43c2aa339b Mon Sep 17 00:00:00 2001 From: sentriz Date: Mon, 9 Oct 2023 20:43:51 +0100 Subject: [PATCH] feat(transcode): lock the destination transcode cache path --- transcode/transcode_test.go | 41 +++++++++++++++++++++++++++++++++ transcode/transcoder_caching.go | 21 +++++++++++++++-- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/transcode/transcode_test.go b/transcode/transcode_test.go index 2011976..eecced2 100644 --- a/transcode/transcode_test.go +++ b/transcode/transcode_test.go @@ -2,11 +2,14 @@ package transcode_test import ( "bytes" + "context" "io" "net/http" "net/http/httptest" "os" "os/exec" + "sync" + "sync/atomic" "testing" "time" @@ -100,3 +103,41 @@ func TestTranscodeWithSeek(t *testing.T) { // since we seeked 2 seconds, we should have 5-2 = 3 seconds of PCM data require.Equal(t, (testFileLen-seekSecs)*bytesPerSec, buf.Len()) } + +func TestCachingParallelism(t *testing.T) { + t.Parallel() + + var realTranscodeCount atomic.Uint64 + transcoder := callbackTranscoder{ + transcoder: transcode.NewFFmpegTranscoder(), + callback: func() { realTranscodeCount.Add(1) }, + } + + cacheTranscoder := transcode.NewCachingTranscoder(transcoder, t.TempDir()) + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + var buf bytes.Buffer + require.NoError(t, cacheTranscoder.Transcode(context.Background(), transcode.PCM16le, "testdata/5s.flac", &buf)) + require.Equal(t, 5*bytesPerSec, buf.Len()) + }() + } + + wg.Wait() + + require.Equal(t, 1, int(realTranscodeCount.Load())) +} + +type callbackTranscoder struct { + transcoder transcode.Transcoder + callback func() +} + +func (ct callbackTranscoder) Transcode(ctx context.Context, profile transcode.Profile, in string, out io.Writer) error { + ct.callback() + return ct.transcoder.Transcode(ctx, profile, in, out) +} diff --git a/transcode/transcoder_caching.go b/transcode/transcoder_caching.go index 2f744eb..d402652 100644 --- a/transcode/transcoder_caching.go +++ b/transcode/transcoder_caching.go @@ -7,6 +7,7 @@ import ( "io" "os" "path/filepath" + "sync" ) const perm = 0o644 @@ -14,6 +15,7 @@ const perm = 0o644 type CachingTranscoder struct { cachePath string transcoder Transcoder + locks keyedMutex } var _ Transcoder = (*CachingTranscoder)(nil) @@ -38,8 +40,10 @@ func (t *CachingTranscoder) Transcode(ctx context.Context, profile Profile, in s } key := cacheKey(name, args) - path := filepath.Join(t.cachePath, key) + unlock := t.locks.Lock(key) + defer unlock() + path := filepath.Join(t.cachePath, key) cf, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0o644) if err != nil { return fmt.Errorf("open cache file: %w", err) @@ -51,7 +55,8 @@ func (t *CachingTranscoder) Transcode(ctx context.Context, profile Profile, in s return nil } - if err := t.transcoder.Transcode(ctx, profile, in, io.MultiWriter(out, cf)); err != nil { + dest := io.MultiWriter(out, cf) + if err := t.transcoder.Transcode(ctx, profile, in, dest); err != nil { os.Remove(path) return fmt.Errorf("internal transcode: %w", err) } @@ -69,3 +74,15 @@ func cacheKey(cmd string, args []string) string { } return fmt.Sprintf("%x", sum.Sum(nil)) } + +type keyedMutex struct { + sync.Map +} + +func (km *keyedMutex) Lock(key string) func() { + value, _ := km.LoadOrStore(key, &sync.Mutex{}) + mu := value.(*sync.Mutex) + mu.Lock() + // TODO: remove key entry from map to save some space? + return mu.Unlock +}