Skip to content

Commit

Permalink
backup storage options
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Oct 15, 2024
1 parent cedb6d8 commit 4786130
Show file tree
Hide file tree
Showing 29 changed files with 405 additions and 501 deletions.
1 change: 1 addition & 0 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func runHandler(c *cli.Context) error {
if err != nil {
return err
}
defer os.RemoveAll(conf.TmpDir)
_ = os.Setenv("TMPDIR", conf.TmpDir)

rc, err := lkredis.GetRedisClient(conf.Redis)
Expand Down
63 changes: 9 additions & 54 deletions pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,66 +34,26 @@ type BaseConfig struct {
WsUrl string `yaml:"ws_url"` // (env LIVEKIT_WS_URL)

// optional
Logging *logger.Config `yaml:"logging"` // logging config
TemplateBase string `yaml:"template_base"` // custom template base url
BackupStorage string `yaml:"backup_storage"` // backup file location for failed uploads
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration
MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes
StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS)
SessionLimits `yaml:"session_limits"` // session duration limits
Logging *logger.Config `yaml:"logging"` // logging config
TemplateBase string `yaml:"template_base"` // custom template base url
ClusterID string `yaml:"cluster_id"` // cluster this instance belongs to
EnableChromeSandbox bool `yaml:"enable_chrome_sandbox"` // enable Chrome sandbox, requires extra docker configuration
MaxUploadQueue int `yaml:"max_upload_queue"` // maximum upload queue size, in minutes

SessionLimits `yaml:"session_limits"` // session duration limits
StorageConfig *StorageConfig `yaml:"storage,omitempty"` // storage config
BackupConfig *StorageConfig `yaml:"backup,omitempty"` // backup config, for storage failures

// dev/debugging
Insecure bool `yaml:"insecure"` // allow chrome to connect to an insecure websocket
Debug DebugConfig `yaml:"debug"` // create dot file on internal error

// deprecated
LogLevel string `yaml:"log_level"` // Use Logging instead
}

type DebugConfig struct {
EnableProfiling bool `yaml:"enable_profiling"` // create dot file and pprof on internal error
PathPrefix string `yaml:"path_prefix"` // filepath prefix for uploads
StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS)
}

type StorageConfig struct {
S3 *S3Config `yaml:"s3"`
Azure *AzureConfig `yaml:"azure"`
GCP *GCPConfig `yaml:"gcp"`
AliOSS *S3Config `yaml:"alioss"`
}

type S3Config struct {
AccessKey string `yaml:"access_key"` // (env AWS_ACCESS_KEY_ID)
Secret string `yaml:"secret"` // (env AWS_SECRET_ACCESS_KEY)
SessionToken string `yaml:"session_token"` // (env AWS_SESSION_TOKEN)
Region string `yaml:"region"` // (env AWS_DEFAULT_REGION)
Endpoint string `yaml:"endpoint"`
Bucket string `yaml:"bucket"`
ForcePathStyle bool `yaml:"force_path_style"`
ProxyConfig *ProxyConfig `yaml:"proxy_config"`
MaxRetries int `yaml:"max_retries"`
MaxRetryDelay time.Duration `yaml:"max_retry_delay"`
MinRetryDelay time.Duration `yaml:"min_retry_delay"`
AwsLogLevel string `yaml:"aws_log_level"`

// deprecated
Proxy string `yaml:"proxy"` // use ProxyConfig instead
}

type AzureConfig struct {
AccountName string `yaml:"account_name"` // (env AZURE_STORAGE_ACCOUNT)
AccountKey string `yaml:"account_key"` // (env AZURE_STORAGE_KEY)
ContainerName string `yaml:"container_name"`
}

type GCPConfig struct {
CredentialsJSON string `yaml:"credentials_json"` // (env GOOGLE_APPLICATION_CREDENTIALS)
Bucket string `yaml:"bucket"`
ProxyConfig *ProxyConfig `yaml:"proxy_config"`
}

type ProxyConfig struct {
Url string `yaml:"url"`
Username string `yaml:"username"`
Expand All @@ -108,11 +68,6 @@ type SessionLimits struct {
}

func (c *BaseConfig) initLogger(values ...interface{}) error {
if c.LogLevel != "" {
logger.Warnw("log_level deprecated. use logging instead", nil)
c.Logging.Level = c.LogLevel
}

var gstDebug []string
switch c.Logging.Level {
case "debug":
Expand Down
32 changes: 11 additions & 21 deletions pkg/config/output_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type FileConfig struct {
StorageFilepath string

DisableManifest bool
UploadConfig UploadConfig
StorageConfig *StorageConfig
}

func (p *PipelineConfig) GetFileConfig() *FileConfig {
Expand Down Expand Up @@ -76,7 +76,7 @@ func (p *PipelineConfig) getFileConfig(outputType types.OutputType, req fileRequ
FileInfo: &livekit.FileInfo{},
StorageFilepath: clean(req.GetFilepath()),
DisableManifest: req.GetDisableManifest(),
UploadConfig: p.getUploadConfig(req),
StorageConfig: p.getStorageConfig(req),
}

// filename
Expand Down Expand Up @@ -137,29 +137,19 @@ func (o *FileConfig) updateFilepath(p *PipelineConfig, identifier string, replac
o.FileInfo.Filename = o.StorageFilepath

// get local filepath
dir, filename := path.Split(o.StorageFilepath)
if o.UploadConfig == nil {
if dir != "" {
// create local directory
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
}
// write directly to requested location
o.LocalFilepath = o.StorageFilepath
} else {
// prepend the configuration base directory and the egress Id
tempDir := path.Join(TmpDir, p.Info.EgressId)
_, filename := path.Split(o.StorageFilepath)

// create temporary directory
if err := os.MkdirAll(tempDir, 0755); err != nil {
return err
}
// prepend the configuration base directory and the egress Id
local := path.Join(p.TmpDir, p.Info.EgressId)

// write to tmp dir
o.LocalFilepath = path.Join(tempDir, filename)
// create temporary directory
if err := os.MkdirAll(local, 0755); err != nil {
return err
}

// write to tmp dir
o.LocalFilepath = path.Join(local, filename)

return nil
}

Expand Down
25 changes: 10 additions & 15 deletions pkg/config/output_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type ImageConfig struct {
ImageExtension types.FileExtension

DisableManifest bool
UploadConfig UploadConfig
StorageConfig *StorageConfig

CaptureInterval uint32
Width int32
Expand Down Expand Up @@ -77,7 +77,7 @@ func (p *PipelineConfig) getImageConfig(images *livekit.ImageOutput) (*ImageConf
ImagePrefix: filenamePrefix,
ImageSuffix: images.FilenameSuffix,
DisableManifest: images.DisableManifest,
UploadConfig: p.getUploadConfig(images),
StorageConfig: p.getStorageConfig(images),
CaptureInterval: images.CaptureInterval,
Width: images.Width,
Height: images.Height,
Expand Down Expand Up @@ -128,25 +128,20 @@ func (o *ImageConfig) updatePrefix(p *PipelineConfig) error {
// update config
o.ImagePrefix = imagesPrefix

if o.UploadConfig == nil {
o.LocalDir = imagesDir
} else {
// Prepend the configuration base directory and the egress Id, and slug to prevent conflict if
// there is more than one image output
// os.ModeDir creates a directory with mode 000 when mapping the directory outside the container
// Append a "/" to the path for consistency with the "UploadConfig == nil" case
o.LocalDir = path.Join(TmpDir, p.Info.EgressId, o.Id) + "/"
}
// Prepend the configuration base directory and the egress Id, and slug to prevent conflict if
// there is more than one image output
// os.ModeDir creates a directory with mode 000 when mapping the directory outside the container
// Append a "/" to the path for consistency with the "UploadConfig == nil" case
o.LocalDir = path.Join(p.TmpDir, p.Info.EgressId, o.Id) + "/"

// create local directories
if o.LocalDir != "" {
if err := os.MkdirAll(o.LocalDir, 0755); err != nil {
return err
}
if err := os.MkdirAll(o.LocalDir, 0755); err != nil {
return err
}

return nil
}

func getMimeTypes(imageCodec livekit.ImageCodec) (types.MimeType, types.OutputType, error) {
switch imageCodec {
case livekit.ImageCodec_IC_DEFAULT, livekit.ImageCodec_IC_JPEG:
Expand Down
22 changes: 8 additions & 14 deletions pkg/config/output_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type SegmentConfig struct {
SegmentDuration int

DisableManifest bool
UploadConfig UploadConfig
StorageConfig *StorageConfig
}

func (p *PipelineConfig) GetSegmentConfig() *SegmentConfig {
Expand All @@ -60,7 +60,7 @@ func (p *PipelineConfig) getSegmentConfig(segments *livekit.SegmentedFileOutput)
LivePlaylistFilename: clean(segments.LivePlaylistName),
SegmentDuration: int(segments.SegmentDuration),
DisableManifest: segments.DisableManifest,
UploadConfig: p.getUploadConfig(segments),
StorageConfig: p.getStorageConfig(segments),
}

if conf.SegmentDuration == 0 {
Expand Down Expand Up @@ -154,24 +154,18 @@ func (o *SegmentConfig) updatePrefixAndPlaylist(p *PipelineConfig) error {
return errors.ErrInvalidInput("live_playlist_name cannot be identical to playlist_name")
}

if o.UploadConfig == nil {
o.LocalDir = playlistDir
} else {
// Prepend the configuration base directory and the egress Id
// os.ModeDir creates a directory with mode 000 when mapping the directory outside the container
// Append a "/" to the path for consistency with the "UploadConfig == nil" case
o.LocalDir = path.Join(TmpDir, p.Info.EgressId) + "/"
}
// Prepend the configuration base directory and the egress Id
// os.ModeDir creates a directory with mode 000 when mapping the directory outside the container
// Append a "/" to the path for consistency with the "UploadConfig == nil" case
o.LocalDir = path.Join(p.TmpDir, p.Info.EgressId) + "/"

// create local directories
if fileDir != "" {
if err := os.MkdirAll(path.Join(o.LocalDir, fileDir), 0755); err != nil {
return err
}
} else if o.LocalDir != "" {
if err := os.MkdirAll(o.LocalDir, 0755); err != nil {
return err
}
} else if err := os.MkdirAll(o.LocalDir, 0755); err != nil {
return err
}

o.SegmentsInfo.PlaylistName = path.Join(o.StorageDir, o.PlaylistFilename)
Expand Down
4 changes: 3 additions & 1 deletion pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import (
lksdk "github.com/livekit/server-sdk-go/v2"
)

const Latency = uint64(3e9)
const (
Latency = uint64(3e9)
)

type PipelineConfig struct {
BaseConfig `yaml:",inline"`
Expand Down
2 changes: 0 additions & 2 deletions pkg/config/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
)

const (
TmpDir = "/home/egress/tmp"

roomCompositeCpuCost = 4
audioRoomCompositeCpuCost = 1
webCpuCost = 4
Expand Down
Loading

0 comments on commit 4786130

Please sign in to comment.