abstract away some of the encode internals

This commit is contained in:
sentriz
2020-05-08 18:42:45 +01:00
parent c65606ba1f
commit 2ee1b4d978
4 changed files with 141 additions and 110 deletions

1
go.mod
View File

@@ -25,6 +25,7 @@ require (
github.com/peterbourgon/ff v1.2.0 github.com/peterbourgon/ff v1.2.0
github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be
github.com/wader/gormstore v0.0.0-20190302154359-acb787ba3755 github.com/wader/gormstore v0.0.0-20190302154359-acb787ba3755
golang.org/x/crypto v0.0.0-20200406173513-056763e48d71 // indirect
golang.org/x/exp v0.0.0-20190121172915-509febef88a4 // indirect golang.org/x/exp v0.0.0-20190121172915-509febef88a4 // indirect
google.golang.org/appengine v1.6.1 // indirect google.golang.org/appengine v1.6.1 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect

2
go.sum
View File

@@ -119,6 +119,8 @@ golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd h1:GGJVjV8waZKRHrgwvtH66z9ZGVurTD1MT0n1Bb+q4aM= golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd h1:GGJVjV8waZKRHrgwvtH66z9ZGVurTD1MT0n1Bb+q4aM=
golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200406173513-056763e48d71 h1:DOmugCavvUtnUD114C1Wh+UgTgQZ4pMLzXxi1pSt+/Y=
golang.org/x/crypto v0.0.0-20200406173513-056763e48d71/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20180710024300-14dda7b62fcd/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180710024300-14dda7b62fcd/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4 h1:c2HOrn5iMezYjSlGPncknSEr/8x5LELb/ilJbXi9DEA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4 h1:c2HOrn5iMezYjSlGPncknSEr/8x5LELb/ilJbXi9DEA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=

View File

@@ -1,9 +1,9 @@
package ctrlsubsonic package ctrlsubsonic
import ( import (
"io"
"log" "log"
"net/http" "net/http"
"os"
"path" "path"
"time" "time"
@@ -21,6 +21,38 @@ import (
// b) return a non-nil spec.Response // b) return a non-nil spec.Response
// _but not both_ // _but not both_
func streamGetTransPref(dbc *db.DB, userID int, client string) db.TranscodePreference {
pref := db.TranscodePreference{}
dbc.
Where("user_id=?", userID).
Where("client COLLATE NOCASE IN (?)", []string{"*", client}).
Order("client DESC"). // ensure "*" is last if it's there
First(&pref)
return pref
}
func streamGetTrack(dbc *db.DB, trackID int) (*db.Track, error) {
track := db.Track{}
err := dbc.
Preload("Album").
First(&track, trackID).
Error
return &track, err
}
func streamUpdateStats(dbc *db.DB, userID, albumID int) {
play := db.Play{
AlbumID: albumID,
UserID: userID,
}
dbc.
Where(play).
First(&play)
play.Time = time.Now() // for getAlbumList?type=recent
play.Count++ // for getAlbumList?type=frequent
dbc.Save(&play)
}
func (c *Controller) ServeGetCoverArt(w http.ResponseWriter, r *http.Request) *spec.Response { func (c *Controller) ServeGetCoverArt(w http.ResponseWriter, r *http.Request) *spec.Response {
params := r.Context().Value(CtxParams).(params.Params) params := r.Context().Value(CtxParams).(params.Params)
id, err := params.GetInt("id") id, err := params.GetInt("id")
@@ -48,96 +80,50 @@ func (c *Controller) ServeGetCoverArt(w http.ResponseWriter, r *http.Request) *s
return nil return nil
} }
func fileExists(filename string) bool {
info, err := os.Stat(filename)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}
type serveTrackOptions struct {
track *db.Track
pref *db.TranscodePreference
maxBitrate int
cachePath string
musicPath string
}
func serveTrackRaw(w http.ResponseWriter, r *http.Request, opts serveTrackOptions) {
log.Printf("serving raw %q\n", opts.track.Filename)
w.Header().Set("Content-Type", opts.track.MIME())
trackPath := path.Join(opts.musicPath, opts.track.RelPath())
http.ServeFile(w, r, trackPath)
}
func serveTrackEncode(w http.ResponseWriter, r *http.Request, opts serveTrackOptions) {
profile := encode.Profiles()[opts.pref.Profile]
bitrate := encode.GetBitrate(opts.maxBitrate, profile)
trackPath := path.Join(opts.musicPath, opts.track.RelPath())
cacheKey := encode.CacheKey(trackPath, opts.pref.Profile, bitrate)
cacheFile := path.Join(opts.cachePath, cacheKey)
if fileExists(cacheFile) {
log.Printf("serving transcode `%s`: cache [%s/%s] hit!\n", opts.track.Filename, profile.Format, bitrate)
http.ServeFile(w, r, cacheFile)
return
}
log.Printf("serving transcode `%s`: cache [%s/%s] miss!\n", opts.track.Filename, profile.Format, bitrate)
if err := encode.Encode(w, trackPath, cacheFile, profile, bitrate); err != nil {
log.Printf("error encoding %q: %v\n", trackPath, err)
return
}
log.Printf("serving transcode `%s`: encoded to [%s/%s] successfully\n",
opts.track.Filename, profile.Format, bitrate)
}
func (c *Controller) ServeStream(w http.ResponseWriter, r *http.Request) *spec.Response { func (c *Controller) ServeStream(w http.ResponseWriter, r *http.Request) *spec.Response {
params := r.Context().Value(CtxParams).(params.Params) params := r.Context().Value(CtxParams).(params.Params)
id, err := params.GetInt("id") id, err := params.GetInt("id")
if err != nil { if err != nil {
return spec.NewError(10, "please provide an `id` parameter") return spec.NewError(10, "please provide an `id` parameter")
} }
track := &db.Track{} track, err := streamGetTrack(c.DB, id)
err = c.DB. if err != nil {
Preload("Album").
First(track, id).
Error
if gorm.IsRecordNotFoundError(err) {
return spec.NewError(70, "media with id `%d` was not found", id) return spec.NewError(70, "media with id `%d` was not found", id)
} }
user := r.Context().Value(CtxUser).(*db.User) user := r.Context().Value(CtxUser).(*db.User)
defer func() { defer streamUpdateStats(c.DB, user.ID, track.Album.ID)
play := db.Play{ pref := streamGetTransPref(c.DB, user.ID, params.Get("c"))
AlbumID: track.Album.ID, trackPath := path.Join(c.MusicPath, track.RelPath())
UserID: user.ID, //
} onInvalidProfile := func() error {
c.DB. log.Printf("serving raw %q\n", track.Filename)
Where(play). w.Header().Set("Content-Type", track.MIME())
First(&play) http.ServeFile(w, r, trackPath)
play.Time = time.Now() // for getAlbumList?type=recent
play.Count++ // for getAlbumList?type=frequent
c.DB.Save(&play)
}()
client := params.Get("c")
servOpts := serveTrackOptions{
track: track,
musicPath: c.MusicPath,
}
pref := &db.TranscodePreference{}
err = c.DB.
Where("user_id=?", user.ID).
Where("client COLLATE NOCASE IN (?)", []string{"*", client}).
Order("client DESC"). // ensure "*" is last if it's there
First(pref).
Error
if gorm.IsRecordNotFoundError(err) {
serveTrackRaw(w, r, servOpts)
return nil return nil
} }
servOpts.pref = pref onCacheHit := func(profile encode.Profile, path string) error {
servOpts.maxBitrate = params.GetIntOr("maxBitRate", 0) log.Printf("serving transcode `%s`: cache [%s/%dk] hit!\n",
servOpts.cachePath = c.CachePath track.Filename, profile.Format, profile.Bitrate)
serveTrackEncode(w, r, servOpts) http.ServeFile(w, r, path)
return nil
}
onCacheMiss := func(profile encode.Profile) (io.Writer, error) {
log.Printf("serving transcode `%s`: cache [%s/%dk] miss!\n",
track.Filename, profile.Format, profile.Bitrate)
return w, nil
}
encodeOptions := encode.Options{
TrackPath: trackPath,
CachePath: c.CachePath,
ProfileName: pref.Profile,
PreferredBitrate: params.GetIntOr("maxBitRate", 0),
OnInvalidProfile: onInvalidProfile,
OnCacheHit: onCacheHit,
OnCacheMiss: onCacheMiss,
}
if err := encode.Encode(encodeOptions); err != nil {
log.Printf("serving transcode `%s`: error: %v\n", track.Filename, err)
}
return nil return nil
} }
@@ -147,17 +133,13 @@ func (c *Controller) ServeDownload(w http.ResponseWriter, r *http.Request) *spec
if err != nil { if err != nil {
return spec.NewError(10, "please provide an `id` parameter") return spec.NewError(10, "please provide an `id` parameter")
} }
track := &db.Track{} track, err := streamGetTrack(c.DB, id)
err = c.DB. if err != nil {
Preload("Album").
First(track, id).
Error
if gorm.IsRecordNotFoundError(err) {
return spec.NewError(70, "media with id `%d` was not found", id) return spec.NewError(70, "media with id `%d` was not found", id)
} }
serveTrackRaw(w, r, serveTrackOptions{ log.Printf("serving raw %q\n", track.Filename)
track: track, w.Header().Set("Content-Type", track.MIME())
musicPath: c.MusicPath, trackPath := path.Join(c.MusicPath, track.RelPath())
}) http.ServeFile(w, r, trackPath)
return nil return nil
} }

View File

@@ -9,6 +9,7 @@ import (
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
"path"
"github.com/cespare/xxhash" "github.com/cespare/xxhash"
) )
@@ -25,6 +26,14 @@ type Profile struct {
forceRG bool forceRG bool
} }
func fileExists(filename string) bool {
info, err := os.Stat(filename)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}
func Profiles() map[string]Profile { func Profiles() map[string]Profile {
return map[string]Profile{ return map[string]Profile{
"mp3": {"mp3", 128, []string{"-c:a", "libmp3lame"}, false}, "mp3": {"mp3", 128, []string{"-c:a", "libmp3lame"}, false},
@@ -34,9 +43,10 @@ func Profiles() map[string]Profile {
} }
} }
// copy command output to http response body using io.copy (simpler, but may increase ttfb) // copy command output to http response body using io.copy
// (it's simpler, but may increase ttfb)
//nolint:deadcode,unused // function may be switched later //nolint:deadcode,unused // function may be switched later
func copyCmdOutput(out, cache io.Writer, pipeReader io.Reader) { func cmdOutputCopy(out, cache io.Writer, pipeReader io.Reader) {
// set up a multiwriter to feed the command output // set up a multiwriter to feed the command output
// to both cache file and http response // to both cache file and http response
w := io.MultiWriter(out, cache) w := io.MultiWriter(out, cache)
@@ -48,7 +58,7 @@ func copyCmdOutput(out, cache io.Writer, pipeReader io.Reader) {
// copy command output to http response manually with a buffer (should reduce ttfb) // copy command output to http response manually with a buffer (should reduce ttfb)
//nolint:deadcode,unused // function may be switched later //nolint:deadcode,unused // function may be switched later
func writeCmdOutput(out, cache io.Writer, pipeReader io.ReadCloser) { func cmdOutputWrite(out, cache io.Writer, pipeReader io.ReadCloser) {
buffer := make([]byte, buffLen) buffer := make([]byte, buffLen)
for { for {
n, err := pipeReader.Read(buffer) n, err := pipeReader.Read(buffer)
@@ -74,13 +84,13 @@ func writeCmdOutput(out, cache io.Writer, pipeReader io.ReadCloser) {
} }
// pre-format the ffmpeg command with needed options // pre-format the ffmpeg command with needed options
func ffmpegCommand(filePath string, profile Profile, bitrate string) *exec.Cmd { func ffmpegCommand(filePath string, profile Profile) *exec.Cmd {
args := []string{ args := []string{
"-v", "0", "-v", "0",
"-i", filePath, "-i", filePath,
"-map", "0:0", "-map", "0:0",
"-vn", "-vn",
"-b:a", bitrate, "-b:a", fmt.Sprintf("%dk", profile.Bitrate),
} }
args = append(args, profile.ffmpegOptions...) args = append(args, profile.ffmpegOptions...)
if profile.forceRG { if profile.forceRG {
@@ -99,9 +109,9 @@ func ffmpegCommand(filePath string, profile Profile, bitrate string) *exec.Cmd {
// but please do let me know if you see otherwise // but please do let me know if you see otherwise
} }
func Encode(out io.Writer, trackPath, cachePath string, profile Profile, bitrate string) error { func encode(out io.Writer, trackPath, cachePath string, profile Profile) error {
// prepare the command and file descriptors // prepare the command and file descriptors
cmd := ffmpegCommand(trackPath, profile, bitrate) cmd := ffmpegCommand(trackPath, profile)
pipeReader, pipeWriter := io.Pipe() pipeReader, pipeWriter := io.Pipe()
cmd.Stdout = pipeWriter cmd.Stdout = pipeWriter
cmd.Stderr = pipeWriter cmd.Stderr = pipeWriter
@@ -110,12 +120,12 @@ func Encode(out io.Writer, trackPath, cachePath string, profile Profile, bitrate
if err != nil { if err != nil {
return fmt.Errorf("writing to cache file %q: %v: %w", cachePath, err, err) return fmt.Errorf("writing to cache file %q: %v: %w", cachePath, err, err)
} }
// still unsure if buffer version (writeCmdOutput) is any better than io.Copy-based one (copyCmdOutput) // still unsure if buffer version (cmdOutputWrite) is any better than io.Copy-based one (cmdOutputCopy)
// initial goal here is to start streaming response asap, with smallest ttfb. more testing needed // initial goal here is to start streaming response asap, with smallest ttfb. more testing needed
// -- @spijet // -- @spijet
// //
// start up writers for cache file and http response // start up writers for cache file and http response
go writeCmdOutput(out, cacheFile, pipeReader) go cmdOutputWrite(out, cacheFile, pipeReader)
// run ffmpeg // run ffmpeg
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
return fmt.Errorf("running ffmpeg: %w", err) return fmt.Errorf("running ffmpeg: %w", err)
@@ -129,18 +139,54 @@ func Encode(out io.Writer, trackPath, cachePath string, profile Profile, bitrate
return nil return nil
} }
// CacheKey generates the filename for the new transcode save // cacheKey generates the filename for the new transcode save
func CacheKey(sourcePath string, profile, bitrate string) string { func cacheKey(sourcePath string, profileName string, profile Profile) string {
format := Profiles()[profile].Format return fmt.Sprintf("%x-%s-%dk.%s",
hash := xxhash.Sum64String(sourcePath) xxhash.Sum64String(sourcePath), profileName, profile.Bitrate, profile.Format,
return fmt.Sprintf("%x-%s-%s.%s", hash, profile, bitrate, format) )
} }
// GetBitrate checks if the client forces bitrate lower than set in profile // getBitrate checks if the client forces bitrate lower than set in profile
func GetBitrate(clientBitrate int, profile Profile) string { func getBitrate(preferred, defined int) int {
bitrate := profile.Bitrate if preferred != 0 && preferred < defined {
if clientBitrate != 0 && clientBitrate < bitrate { return preferred
bitrate = clientBitrate
} }
return fmt.Sprintf("%dk", bitrate) return defined
}
type (
OnInvalidProfileFunc func() error
OnCacheHitFunc func(Profile, string) error
OnCacheMissFunc func(Profile) (io.Writer, error)
)
type Options struct {
TrackPath string
CachePath string
ProfileName string
PreferredBitrate int
OnInvalidProfile OnInvalidProfileFunc
OnCacheHit OnCacheHitFunc
OnCacheMiss OnCacheMissFunc
}
func Encode(opts Options) error {
profile, ok := Profiles()[opts.ProfileName]
if !ok {
return opts.OnInvalidProfile()
}
profile.Bitrate = getBitrate(opts.PreferredBitrate, profile.Bitrate)
cacheKey := cacheKey(opts.TrackPath, opts.ProfileName, profile)
cachePath := path.Join(opts.CachePath, cacheKey)
if fileExists(cachePath) {
return opts.OnCacheHit(profile, cachePath)
}
writer, err := opts.OnCacheMiss(profile)
if err != nil {
return fmt.Errorf("starting cache serve: %w", err)
}
if err := encode(writer, opts.TrackPath, cachePath, profile); err != nil {
return fmt.Errorf("starting transcode: %w", err)
}
return nil
} }