From 6bb9377428eaee7f4cfc3055d138e217ade344bc Mon Sep 17 00:00:00 2001 From: Sherman Beus Date: Wed, 28 Aug 2024 17:37:38 -0700 Subject: [PATCH] Fixes race conditions and adds some new features: - option to simulate HTTP failures (needed for testing unstable networks) - option to change the rate at which retries are done (error backoff) - fixes bug where outgoing files logged multiple times (see line 1094 of client/client.go) - paylod = nil - adds raw bandwidth logging that includes all network traffic sent/received by client - ability to restart clients via internal server request - various fixes to accommodate linting complaints - race conditions in payload/bin.go and stage/local.go --- .assets/test/test-glut.sh | 121 +++++++++++++---- .assets/test/test4.client.yaml | 72 ++++++----- .assets/test/test4.server.yaml | 25 ++-- cache/local.go | 19 +++ client/client.go | 49 +++++-- conf.go | 29 ++++- http/client.go | 33 +++-- http/internal.go | 6 +- http/server.go | 18 +++ http/server_test.go | 8 +- http/util.go | 152 +++++++++++++++++++++- main/app.go | 35 ++++- main/client.go | 39 ++++-- main/controlled.go | 1 - main/controlled_test.go | 6 +- main/server.go | 19 +-- payload/bin.go | 20 ++- stage/local.go | 229 ++++++++++++++++++++------------- sts.go | 2 +- 19 files changed, 648 insertions(+), 235 deletions(-) diff --git a/.assets/test/test-glut.sh b/.assets/test/test-glut.sh index 89daa92..4764e8a 100755 --- a/.assets/test/test-glut.sh +++ b/.assets/test/test-glut.sh @@ -31,23 +31,30 @@ if [ ! -z "$serverdebug" ]; then mkdir -p $(dirname $serveroutput) fi -cmd_client1="$bin$VCLIENT1 $clientdebug--mode=out --loop" -cmd_client2="$bin$VCLIENT2 $clientdebug--mode=out --loop" +iport=1990 + +cmd_client1="$bin$VCLIENT1 $clientdebug--mode=out --loop --iport=$iport" +cmd_client2="$bin$VCLIENT2 $clientdebug--mode=out --loop --iport=$iport" cmd_server1="$bin$VSERVER1 $serverdebug--mode=in" cmd_server2="$bin$VSERVER2 $serverdebug--mode=in" -sim=( - "stsin-1 xs 100 5000 0" - "stsin-1 xl 100000000 2 0" - "stsin-2 lg 10000000 20 0" - "stsin-2 md 100000 500 0" - "stsin-2 sm 10000 1000 0" -) -for args in "${sim[@]}"; do - echo "Making data: $args ..." - $PWD/$basedir/makedata.py $args -done +function makedata() { + sim=( + "stsin-1 xs 100 10000 0" + "stsin-1 xl 100000000 4 0" + "stsin-2 lg 10000000 40 0" + "stsin-2 md 100000 1000 0" + "stsin-2 sm 10000 2000 0" + ) + for args in "${sim[@]}"; do + echo "Making data: $args ..." + $PWD/$basedir/makedata.py $args + done +} + +makedata & +makedata_pid=$! echo "Running server ..." $cmd_server1 > $serveroutput & @@ -56,7 +63,22 @@ pid_server=$! echo "Running client ..." $cmd_client1 > $clientoutput & pid_client=$! -pids=( "$pid_client" "$pid_server" ) +pids=( "$pid_client" "$pid_server" "$makedata_pid" ) + +function memory() { + while true; do + sleep $(( 10 * $sleepfactor )) + echo "" + echo "Memory Usage:" + ps -o rss,args -p $pid_client $pid_server \ + | awk 'NR>1 {printf "%.2f GB %s\n", $1/1024/1024, substr($0, index($0,$2))}' + echo "" + done +} + +memory & +memory_pid=$! +pids+=($memory_pid) function monkey() { count_stg=0 @@ -94,11 +116,30 @@ function ctrl_c() { exit 0 } +function cleanrestart() { + echo "Doing a 'clean' client restart ..." + curl -XPUT "http://localhost:$iport/restart-clients" + echo "Client(s) restarted!" +} + # Handle interrupt signal trap ctrl_c INT -# Sleep for a bit to send some of the data -sleep $(( 20 * $sleepfactor )) +# Sleep until the count of outgoing files goes down +countprev=0 +while true; do + sleep $(( 5 * $sleepfactor )) + out=`find $STS_HOME/data/out -type f 2>/dev/null | sort` + if [ "$out" ]; then + lines=($out) + count=${#lines[@]} + # break if the count is going down + if (( countprev > 0 && count < countprev )); then + break + fi + countprev=$count + fi +done echo "Stopping ..." @@ -106,11 +147,17 @@ echo "Stopping ..." kill -9 $pid_client kill -9 $pid_server kill $monkey_pid > /dev/null 2>&1 +kill $memory_pid > /dev/null 2>&1 -# Make more data -for args in "${sim[@]}"; do - echo "Making data: $args ..." - $PWD/$basedir/makedata.py $args +# Wait until PIDs are gone +while true; do + sleep $(( 1 * $sleepfactor )) + for pid in "${pids[@]}"; do + if ps -p $pid > /dev/null 2>&1; then + continue + fi + break 2 + done done echo "Restarting server ..." @@ -127,12 +174,28 @@ echo "Restarting monkey ..." monkey & monkey_pid=$! -pids=( "$pid_client" "$pid_server" "$monkey_pid" ) +echo "Restarting memory monitor ..." +memory & +memory_pid=$! + +# Make more data +makedata & +makedata_pid=$! -echo "Waiting ..." +pids=( "$pid_client" "$pid_server" "$monkey_pid" "$memory_pid" "$makedata_pid" ) + +restarttime=$(date +%s) +restartinterval=60 # Wait for the client to be done while true; do + now=$(date +%s) + elapsed=$((now - restarttime)) + echo "Elapsed: $elapsed" + if (( elapsed > restartinterval )); then + cleanrestart + restarttime=$(date +%s) + fi sleep $(( 5 * $sleepfactor )) out=`find $STS_HOME/data/out -type f 2>/dev/null | sort` if [ "$out" ]; then @@ -146,7 +209,7 @@ while true; do done # Trigger a stage cleanup manually -curl 'localhost:1992/clean?block&minage=1' +# curl 'localhost:1992/clean?block&minage=1' while true; do sleep $(( 1 * $sleepfactor )) @@ -157,6 +220,7 @@ while true; do continue fi kill $pid_server + kill $memory_pid > /dev/null 2>&1 echo "Done!" break done @@ -197,7 +261,7 @@ if [ "$stale" ]; then fi sortd=`cut -d ":" -f 1 $STS_HOME/data/log/incoming_from/*/*/* | sort` -uniqd=`uniq -c <(cat <(echo "$sortd")) | grep -v " 1"` +uniqd=`uniq -c <(cat <(echo "$sortd")) | grep -v " 1 "` if [ "$uniqd" ]; then echo "Incoming Duplicates (probably OK):" cat <(echo "$uniqd") @@ -205,4 +269,13 @@ if [ "$uniqd" ]; then exit 0 fi +sortd=`cut -d ":" -f 1 $STS_HOME/data/log/outgoing_to/*/*/* | sort` +uniqd=`uniq -c <(cat <(echo "$sortd")) | grep -v " 1 "` +if [ "$uniqd" ]; then + echo "Outgoing Duplicates:" + cat <(echo "$uniqd") + echo "FAILED!" + exit 0 +fi + echo "SUCCESS!" diff --git a/.assets/test/test4.client.yaml b/.assets/test/test4.client.yaml index c3f11ea..a8fefc3 100644 --- a/.assets/test/test4.client.yaml +++ b/.assets/test/test4.client.yaml @@ -1,42 +1,44 @@ OUT: # Outgoing configuration dirs: # Outgoing directory configuration - cache : .sts/out # Used to store internal caches - logs : data/log - out : data/out # Directory to watch for files to send; appends "/{target}" unless overridden - out-follow : true + cache: .sts/out # Used to store internal caches + logs: data/log + out: data/out # Directory to watch for files to send; appends "/{target}" unless overridden + out-follow: true sources: # Supports multiple sources where omitted entries will inherit from previous sources hierarchically - - name : stsout-1 # Name of the source - threads : 8 # Maximum number of concurrent connections - scan-delay : 10s - cache-age : 5m - min-age : 5s # How old a file must be before being added to the "outgoing" queue - max-age : 0s # How old a file can be before getting logged as "stale" (remains in the queue) - bin-size : 20MB # The generally-desired size for a given HTTP request (BEFORE any compression) - compress : 0 # Use GZIP compression level (0-9) (NOTE: bin-size is based on file size BEFORE compression) - stat-payload : true # Log payload transmission stats - poll-delay : 2s # How long to wait after file sent before final validation - poll-interval : 5s # How long to wait between polling requests - poll-attempts : 1 # How many times to "poll" for the successful reception of a file before re-sending + - name: stsout-1 # Name of the source + threads: 16 # Maximum number of concurrent connections + scan-delay: 10s + cache-age: 5m + min-age: 5s # How old a file must be before being added to the "outgoing" queue + max-age: 0s # How old a file can be before getting logged as "stale" (remains in the queue) + bin-size: 20MB # The generally-desired size for a given HTTP request (BEFORE any compression) + compress: 0 # Use GZIP compression level (0-9) (NOTE: bin-size is based on file size BEFORE compression) + stat-payload: true # Log payload transmission stats + stat-interval: 30s # How often to log payload transmission stats + poll-delay: 2s # How long to wait after file sent before final validation + poll-interval: 5s # How long to wait between polling requests + poll-attempts: 1 # How many times to "poll" for the successful reception of a file before re-sending + error-backoff: 0.1 # Multiplier for How long to wait before retrying after an error target: # Target-specific configuration - name : stsin-1 # Name of the target - http-host : localhost:1992 + name: stsin-1 # Name of the target + http-host: localhost:1992 tags: # Tags are for configuration based on file patterns (omitted attributes are inherited) - - pattern : DEFAULT # The file "tag" pattern - priority : 1 # Relative importance (higher the number, greater the importance) - order : fifo # File order (fifo (first in, first out), lifo, or none) - delete : true # Whether or not to delete files after reception confirmation - delete-delay : 30s - method : http - - pattern : ^xs/ - priority : 3 - - pattern : ^sm/ - priority : 2 - - pattern : ^xl/ - priority : 0 + - pattern: DEFAULT # The file "tag" pattern + priority: 1 # Relative importance (higher the number, greater the importance) + order: fifo # File order (fifo (first in, first out), lifo, or none) + delete: true # Whether or not to delete files after reception confirmation + delete-delay: 30s + method: http + - pattern: ^xs/ + priority: 3 + - pattern: ^sm/ + priority: 2 + - pattern: ^xl/ + priority: 0 rename: - - from: '^(?P[a-z]{2})/(?P.+)$' - to: '{{.pfx}}/{{.rest}}.ext' - - name : stsout-2 - out-dir : data/out/stsin-2 # Have to override when running on the same host since the target is the same + - from: "^(?P[a-z]{2})/(?P.+)$" + to: "{{.pfx}}/{{.rest}}.ext" + - name: stsout-2 + out-dir: data/out/stsin-2 # Have to override when running on the same host since the target is the same target: - name : stsin-1 + name: stsin-1 diff --git a/.assets/test/test4.server.yaml b/.assets/test/test4.server.yaml index 62e2ec9..99b6afd 100644 --- a/.assets/test/test4.server.yaml +++ b/.assets/test/test4.server.yaml @@ -1,13 +1,14 @@ IN: # Incoming configuration. - log-buffering: true # Set to true only when potentially redundant transfers it not critical - sources: - - stsout-1 - - stsout-2 - dirs: # Incoming directory configuration. - stage : data/stage # Directory to stage data as it comes in; appends "source" name - final : data/in # Final location for incoming files; appends "/{source}" - logs : data/log # Root directory for log files - server: # Server configuration. - http-host : localhost - http-port : 1992 # What port to listen on - compress : 0 # Use GZIP compression level (0-9) on response data + log-buffering: true # Set to true only when potentially redundant transfers it not critical + sources: + - stsout-1 + - stsout-2 + dirs: # Incoming directory configuration. + stage: data/stage # Directory to stage data as it comes in; appends "source" name + final: data/in # Final location for incoming files; appends "/{source}" + logs: data/log # Root directory for log files + server: # Server configuration. + http-host: localhost + http-port: 1992 # What port to listen on + compress: 0 # Use GZIP compression level (0-9) on response data + chance-of-simulated-failure: 0.10 # Chance of simulated network failure diff --git a/cache/local.go b/cache/local.go index 9af587d..72aa531 100644 --- a/cache/local.go +++ b/cache/local.go @@ -20,33 +20,48 @@ type cacheFile struct { Meta []byte `json:"meta"` Hash string `json:"hash"` Done bool `json:"done"` + mux sync.RWMutex } func (f *cacheFile) GetPath() string { + f.mux.RLock() + defer f.mux.RUnlock() return f.path } func (f *cacheFile) GetName() string { + f.mux.RLock() + defer f.mux.RUnlock() return f.name } func (f *cacheFile) GetSize() int64 { + f.mux.RLock() + defer f.mux.RUnlock() return f.Size } func (f *cacheFile) GetTime() time.Time { + f.mux.RLock() + defer f.mux.RUnlock() return f.Time.Time } func (f *cacheFile) GetMeta() []byte { + f.mux.RLock() + defer f.mux.RUnlock() return f.Meta } func (f *cacheFile) GetHash() string { + f.mux.RLock() + defer f.mux.RUnlock() return f.Hash } func (f *cacheFile) IsDone() bool { + f.mux.RLock() + defer f.mux.RUnlock() return f.Done } @@ -128,7 +143,9 @@ func (j *JSON) Done(key string, whileLocked func(sts.Cached)) { return } j.dirty = true + f.(*cacheFile).mux.Lock() f.(*cacheFile).Done = true + f.(*cacheFile).mux.Unlock() if whileLocked != nil { whileLocked(f) } @@ -142,7 +159,9 @@ func (j *JSON) Reset(key string) { } j.mutex.Lock() defer j.mutex.Unlock() + f.(*cacheFile).mux.Lock() f.(*cacheFile).Hash = "" + f.(*cacheFile).mux.Unlock() j.dirty = true } diff --git a/client/client.go b/client/client.go index 79dc1ff..44050c1 100644 --- a/client/client.go +++ b/client/client.go @@ -47,6 +47,7 @@ type Conf struct { PollInterval time.Duration // How long to wait between between validation requests PollAttempts int // How many times to attempt validation for a single file before giving up Tags []*FileTag // Options for files of a particular pattern + ErrorBackoff float64 // Multiplier for backoff time } // FileTag is the struct for defining settings relevant to files of a "tag" @@ -289,8 +290,7 @@ func (broker *Broker) recover() (send []sts.Hashed, err error) { if err != nil { broker.error("Recovery request failed:", err.Error()) nErr++ - // Wait longer the more it fails - time.Sleep(time.Duration(nErr) * time.Second) + broker.applyErrorBackoff(nErr) continue } break @@ -868,22 +868,24 @@ func (broker *Broker) startSend(wg *sync.WaitGroup) { in := broker.chTransmit out := broker.chTransmitted var nErr int + var payload sts.Payload + var ok bool for { log.Debug("Send loop ...") - payload, ok := <-in + payload, ok = <-in if !ok || broker.shouldStopNow() { return } - for _, p := range payload.GetParts() { - beg, len := p.GetSlice() - log.Debug("Sending:", p.GetName(), beg, beg+len, - "T:", p.GetFileTime()) - } nErr = 0 for { if broker.shouldStopNow() { return } + for _, p := range payload.GetParts() { + beg, len := p.GetSlice() + log.Debug("Sending:", p.GetName(), beg, beg+len, + "T:", p.GetFileTime()) + } n, err := broker.Conf.Transmitter(payload) if err == nil { break @@ -918,8 +920,7 @@ func (broker *Broker) startSend(wg *sync.WaitGroup) { payload = nil break } - // Wait longer the more it fails - time.Sleep(time.Duration(nErr) * time.Second) + broker.applyErrorBackoff(nErr) } if payload != nil { broker.stat(payload) @@ -995,6 +996,10 @@ func (broker *Broker) handleSendError(payload sts.Payload, nPartsReceived int) s if n > 0 { next := payload.Split(n) broker.stat(payload) + for i, p := range payload.GetParts() { + beg, len := p.GetSlice() + log.Debug("Successfully recovered part:", i, p.GetName(), beg, beg+len) + } // broker.chTransmitted <- payload if !sendCh( broker.shouldStopNow, @@ -1004,14 +1009,19 @@ func (broker *Broker) handleSendError(payload sts.Payload, nPartsReceived int) s ) { break } + if next != nil { + for i, p := range next.GetParts() { + beg, len := p.GetSlice() + log.Debug("Recovering failed part:", i, p.GetName(), beg, beg+len) + } + } payload = next } break } nErr++ broker.error("Payload recovery request failed:", err.Error()) - // Wait longer the more it fails - time.Sleep(time.Duration(nErr) * time.Second) + broker.applyErrorBackoff(nErr) } return payload } @@ -1081,6 +1091,7 @@ func (broker *Broker) startTrack(wg *sync.WaitGroup) { wait = time.After(time.Second) } } + payload = nil select { case payload, ok = <-in: if !ok { @@ -1129,6 +1140,7 @@ func (broker *Broker) startTrack(wg *sync.WaitGroup) { pFile.completed = payload.GetCompleted() } broker.Conf.Logger.Sent(pFile) + log.Debug("Logged:", pFile.GetName(), pFile.sent, pFile.size) } } } @@ -1207,8 +1219,7 @@ func (broker *Broker) startValidate(wg *sync.WaitGroup) { if err != nil { broker.error("Poll request failed:", err.Error()) nErr++ - // Wait longer the more it fails - time.Sleep(time.Duration(nErr) * time.Second) + broker.applyErrorBackoff(nErr) continue } break @@ -1329,6 +1340,16 @@ func (broker *Broker) startRetry(wg *sync.WaitGroup) { } } +func (broker *Broker) applyErrorBackoff(nErr int) { + if broker.Conf.ErrorBackoff == 0 { + return + } + // Wait longer the more it fails + time.Sleep( + time.Duration(nErr*int(broker.Conf.ErrorBackoff)) * time.Second, + ) +} + type placeholderFile struct { name string size int64 diff --git a/conf.go b/conf.go index 625490a..9cb026a 100644 --- a/conf.go +++ b/conf.go @@ -58,10 +58,14 @@ func (conf *ClientConf) propagate() { if src != nil { tgt = conf.Sources[i] origStat := tgt.StatPayload + origBackoff := tgt.ErrorBackoff reflectutil.CopyStruct(tgt, src) if tgt.isStatPayloadSet { tgt.StatPayload = origStat } + if tgt.isErrorBackoffSet { + tgt.ErrorBackoff = origBackoff + } if src.Target != nil && tgt.Target != nil { reflectutil.CopyStruct(tgt.Target, src.Target) } @@ -141,12 +145,14 @@ type SourceConf struct { IncludeHidden bool Include []*regexp.Regexp Ignore []*regexp.Regexp + ErrorBackoff float64 // We have to do this mumbo jumbo if we ever want a false value to // override a true value because a false boolean value is the "empty" // value and it's impossible to know if it was set in the config file or // if it was just the default value because it wasn't specified. - isStatPayloadSet bool + isStatPayloadSet bool + isErrorBackoffSet bool } func (c *SourceConf) GenMappingVars() map[string]string { @@ -179,6 +185,7 @@ type auxSourceConf struct { IncludeHidden string `yaml:"include-hidden" json:"include-hidden"` Include []string `yaml:"include" json:"include"` Ignore []string `yaml:"ignore" json:"ignore"` + ErrorBackoff string `yaml:"error-backoff" json:"error-backoff"` } func (ss *SourceConf) applyAux(aux *auxSourceConf) (err error) { @@ -231,6 +238,10 @@ func (ss *SourceConf) applyAux(aux *auxSourceConf) (err error) { } ss.Include = patterns[0:len(aux.Include)] ss.Ignore = patterns[len(aux.Include):] + if aux.ErrorBackoff != "" { + ss.ErrorBackoff, err = strconv.ParseFloat(aux.ErrorBackoff, 64) + ss.isErrorBackoffSet = true + } return } @@ -299,6 +310,9 @@ func (ss *SourceConf) MarshalJSON() ([]byte, error) { } aux.Include = strings[0:len(ss.Include)] aux.Ignore = strings[len(ss.Include):] + if ss.isErrorBackoffSet { + aux.ErrorBackoff = fmt.Sprintf("%f", ss.ErrorBackoff) + } return json.Marshal(aux) } @@ -503,12 +517,13 @@ type ServerDirs struct { // HTTPServer is the struct for managing the incoming HTTP host type HTTPServer struct { - Host string `yaml:"http-host" json:"http-host"` - Port int `yaml:"http-port" json:"http-port"` - PathPrefix string `yaml:"http-path-prefix" json:"http-path-prefix"` - TLSCertPath string `yaml:"http-tls-cert" json:"http-tls-cert"` - TLSKeyPath string `yaml:"http-tls-key" json:"http-tls-key"` - Compression int `yaml:"compress" json:"compress"` + Host string `yaml:"http-host" json:"http-host"` + Port int `yaml:"http-port" json:"http-port"` + PathPrefix string `yaml:"http-path-prefix" json:"http-path-prefix"` + TLSCertPath string `yaml:"http-tls-cert" json:"http-tls-cert"` + TLSKeyPath string `yaml:"http-tls-key" json:"http-tls-key"` + Compression int `yaml:"compress" json:"compress"` + ChanceOfSimulatedFailure float64 `yaml:"chance-of-simulated-failure" json:"chance-of-simulated-failure"` } // Queue is the struct for managing an AWS queue resource diff --git a/http/client.go b/http/client.go index 7ce8b10..7e897c0 100644 --- a/http/client.go +++ b/http/client.go @@ -55,18 +55,19 @@ func (c *confirmed) Received() bool { // Client is the main client struct type Client struct { - SourceName string - TargetHost string - TargetPort int - TargetPrefix string - TargetKey string - Compression int - Timeout time.Duration - TLS *tls.Config - PartialsDecoder sts.DecodePartials + SourceName string + TargetHost string + TargetPort int + TargetPrefix string + TargetKey string + Compression int + Timeout time.Duration + TLS *tls.Config + PartialsDecoder sts.DecodePartials + BandwidthLogInterval time.Duration root string - client *http.Client + client *BandwidthLoggingClient } func (h *Client) rootURL() string { @@ -89,13 +90,22 @@ func (h *Client) init() error { return nil } var err error - if h.client, err = GetClient(h.TLS); err != nil { + if h.client, err = GetClient( + h.TLS, + h.BandwidthLogInterval, + fmt.Sprintf("(%s) ", h.SourceName)); err != nil { return err } h.client.Timeout = h.Timeout return nil } +func (h *Client) Destroy() { + if h.client != nil { + h.client.Stop() + } +} + // Transmit is of type sts.Transmit for sending a payload func (h *Client) Transmit(payload sts.Payload) (n int, err error) { if err = h.init(); err != nil { @@ -114,6 +124,7 @@ func (h *Client) Transmit(payload sts.Payload) (n int, err error) { mr := bytes.NewReader(meta) dr := payload.GetEncoder() pr, pw := io.Pipe() + defer dr.Close() go func() { if gz != nil { gz.Reset(pw) diff --git a/http/internal.go b/http/internal.go index 6029507..bad7d6a 100644 --- a/http/internal.go +++ b/http/internal.go @@ -10,12 +10,16 @@ import ( // Internal responds to internal HTTP command requests for altering the behavior // of the running executable type Internal struct { - Port int + Port int + Handlers map[string]http.HandlerFunc } // Serve starts HTTP server. func (s *Internal) Serve(stop <-chan bool, done chan<- bool) { http.Handle("/debug", http.HandlerFunc(s.routeDebug)) + for key, handler := range s.Handlers { + http.Handle("/"+key, handler) + } addr := fmt.Sprintf("localhost:%d", s.Port) server := NewGracefulServer( diff --git a/http/server.go b/http/server.go index 9347c17..7d00177 100644 --- a/http/server.go +++ b/http/server.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "io" + "math/rand/v2" "net/http" "os" "path/filepath" @@ -36,6 +37,8 @@ type Server struct { IsValid sts.RequestValidator ClientManager sts.ClientManager + ChanceOfSimulatedFailure float64 + lock sync.RWMutex } @@ -320,6 +323,7 @@ func (s *Server) routeData(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) return } + s.potentiallySimulateFailure(w, float64(index+1)/float64(len(parts))) file := &sts.Partial{ Name: parts[index].GetName(), Renamed: parts[index].GetRenamed(), @@ -530,3 +534,17 @@ func getKey(r *http.Request) string { } return key } + +func (s *Server) potentiallySimulateFailure(w http.ResponseWriter, pctDone float64) { + if s.ChanceOfSimulatedFailure == 0 { + return + } + if s.ChanceOfSimulatedFailure*pctDone < rand.Float64() { + return + } + conn, _, err := w.(http.Hijacker).Hijack() + if err != nil { + return + } + conn.Close() +} diff --git a/http/server_test.go b/http/server_test.go index 3815f45..199238d 100644 --- a/http/server_test.go +++ b/http/server_test.go @@ -47,11 +47,11 @@ func stageFiles(count int, bytes units.Base2Bytes) { func requestInternal(url string) error { var err error - var client *http.Client + var client *BandwidthLoggingClient var req *http.Request var resp *http.Response - if client, err = GetClient(nil); err != nil { + if client, err = GetClient(nil, time.Duration(0), ""); err != nil { return err } @@ -73,12 +73,12 @@ func requestInternal(url string) error { func request(method, url string, data io.Reader, respData ...any) error { var err error - var client *http.Client + var client *BandwidthLoggingClient var req *http.Request var resp *http.Response var reader io.ReadCloser - if client, err = GetClient(nil); err != nil { + if client, err = GetClient(nil, time.Duration(0), ""); err != nil { return err } diff --git a/http/util.go b/http/util.go index ff8360e..ae887de 100644 --- a/http/util.go +++ b/http/util.go @@ -6,16 +6,19 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "fmt" "io" "net" "net/http" "os" "strings" "sync" + "sync/atomic" "time" "code.arm.gov/dataflow/sts/fileutil" "code.arm.gov/dataflow/sts/log" + "github.com/alecthomas/units" ) const ( @@ -91,11 +94,18 @@ func ListenAndServe(addr string, tlsConf *tls.Config, handler http.Handler) erro // Close gracefully shuts down the DefaultServer. func Close() error { + if DefaultServer == nil { + return nil + } return DefaultServer.Close() } // GetClient returns a secure http.Client instance pointer based on cert paths. -func GetClient(tlsConf *tls.Config) (client *http.Client, err error) { +func GetClient( + tlsConf *tls.Config, + logInterval time.Duration, + logPrefix string, +) (client *BandwidthLoggingClient, err error) { // Create new client using tls config // NOTE: cloning the default transport means we get things like honoring // proxy env vars (e.g. HTTP_PROXY, HTTPS_PROXY, etc)... @@ -104,7 +114,9 @@ func GetClient(tlsConf *tls.Config) (client *http.Client, err error) { tr.TLSClientConfig = tlsConf } tr.DisableKeepAlives = true - client = &http.Client{Transport: tr} + client = newBandwidthLoggingClient(tr, logInterval, func(i ...interface{}) { + log.Info(append([]interface{}{logPrefix}, i...)...) + }) return } @@ -393,3 +405,139 @@ func handleError(w http.ResponseWriter, code int, err error) { w.WriteHeader(code) w.Write([]byte(err.Error())) } + +type bandwidthMonitorTransport struct { + transport http.RoundTripper + bytesSent uint64 + bytesReceived uint64 + totalDuration uint64 // in nanoseconds +} + +func (t *bandwidthMonitorTransport) RoundTrip(req *http.Request) (*http.Response, error) { + if t.transport == nil { + t.transport = http.DefaultTransport + } + + start := time.Now() + defer func() { + duration := time.Since(start) + atomic.AddUint64(&t.totalDuration, uint64(duration)) + }() + + if req.Body != nil { + originalBody := req.Body + req.Body = &readCounter{req.Body, &t.bytesSent} + defer originalBody.Close() + } + + resp, err := t.transport.RoundTrip(req) + if err != nil { + return resp, err + } + + resp.Body = &readCounter{resp.Body, &t.bytesReceived} + return resp, nil +} + +func (t *bandwidthMonitorTransport) GetBytesSent() uint64 { + return atomic.LoadUint64(&t.bytesSent) +} + +func (t *bandwidthMonitorTransport) GetBytesReceived() uint64 { + return atomic.LoadUint64(&t.bytesReceived) +} + +func (t *bandwidthMonitorTransport) GetTotalDuration() time.Duration { + return time.Duration(atomic.LoadUint64(&t.totalDuration)) +} + +func (t *bandwidthMonitorTransport) ResetCounters() { + atomic.StoreUint64(&t.bytesSent, 0) + atomic.StoreUint64(&t.bytesReceived, 0) + atomic.StoreUint64(&t.totalDuration, 0) +} + +type readCounter struct { + reader io.Reader + counter *uint64 +} + +func (r *readCounter) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + atomic.AddUint64(r.counter, uint64(n)) + return +} + +func (r *readCounter) Close() error { + if c, ok := r.reader.(io.Closer); ok { + return c.Close() + } + return nil +} + +type BandwidthLoggingClient struct { + *http.Client + monitor *bandwidthMonitorTransport + stopChan chan struct{} +} + +func newBandwidthLoggingClient( + baseTransport http.RoundTripper, + logInterval time.Duration, + logFn func(...interface{}), +) *BandwidthLoggingClient { + monitor := &bandwidthMonitorTransport{transport: baseTransport} + client := &http.Client{Transport: monitor} + blc := &BandwidthLoggingClient{ + Client: client, + monitor: monitor, + stopChan: make(chan struct{}), + } + if logInterval > 0 { + go blc.logBandwidthUsage(logInterval, logFn) + } + return blc +} + +func (blc *BandwidthLoggingClient) Stop() { + close(blc.stopChan) +} + +func (blc *BandwidthLoggingClient) logBandwidthUsage( + interval time.Duration, + logFn func(...interface{}), +) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + sent := blc.monitor.GetBytesSent() + received := blc.monitor.GetBytesReceived() + duration := blc.monitor.GetTotalDuration() + + if duration > 0 { + sentBps := int64(float64(sent) / duration.Seconds()) + receivedBps := int64(float64(received) / duration.Seconds()) + + lines := []string{ + fmt.Sprintf("Raw bandwidth usage in the last %s:", interval), + fmt.Sprintf(" - Sent: %s (%s/s)", units.Base2Bytes(sent), units.Base2Bytes(sentBps)), + fmt.Sprintf(" - Received: %s (%s/s)", units.Base2Bytes(received), units.Base2Bytes(receivedBps)), + fmt.Sprintf(" - Total duration: %s", duration), + } + logFn(strings.Join(lines, "\n")) + } else { + logFn(fmt.Sprintf("No requests made in the last %s", interval)) + } + // sent := blc.monitor.GetBytesSent() + // received := blc.monitor.GetBytesReceived() + // logFn("Bandwidth usage in the last %s - Sent: %s, Received: %s", + // interval, units.Base2Bytes(sent), units.Base2Bytes(received)) + blc.monitor.ResetCounters() + case <-blc.stopChan: + return + } + } +} diff --git a/main/app.go b/main/app.go index 0433c80..0f99e38 100644 --- a/main/app.go +++ b/main/app.go @@ -24,6 +24,8 @@ import ( "path/filepath" "strings" + httpbase "net/http" + "github.com/coreos/go-systemd/v22/daemon" ) @@ -226,7 +228,9 @@ func (a *app) run() { sc := make(chan os.Signal, 1) signal.Notify(sc, os.Interrupt, syscall.SIGTERM) - daemon.SdNotify(false, daemon.SdNotifyReady) + if _, err := daemon.SdNotify(false, daemon.SdNotifyReady); err != nil { + log.Error("Failed to notify systemd:", err.Error()) + } var dogTicker *time.Ticker var dogDone chan bool @@ -240,7 +244,9 @@ func (a *app) run() { case <-dogDone: return case <-dogTicker.C: - daemon.SdNotify(false, daemon.SdNotifyWatchdog) + if _, err = daemon.SdNotify(false, daemon.SdNotifyWatchdog); err != nil { + log.Error("Failed to notify watchdog:", err.Error()) + } } } }() @@ -255,7 +261,9 @@ func (a *app) run() { dogTicker.Stop() dogDone <- true } - daemon.SdNotify(false, daemon.SdNotifyStopping) + if _, err := daemon.SdNotify(false, daemon.SdNotifyStopping); err != nil { + log.Error("Failed to notify systemd:", err.Error()) + } } a.stopClients(stopFull) a.stopServer() @@ -384,6 +392,9 @@ func (a *app) stopClients(stopGraceful bool) { }(stop, stopGraceful, done, &wg) } wg.Wait() + for _, c := range a.clients { + c.destroy() + } a.clients = nil a.clientStop = nil } @@ -392,8 +403,12 @@ func (a *app) startInternalServer() { if a.iPort == 0 { return } + fmt.Println("Starting internal server...", a.iPort) server := &http.Internal{ Port: a.iPort, + Handlers: map[string]httpbase.HandlerFunc{ + "restart-clients": a.restartHandler, + }, } stop := make(chan bool) done := make(chan bool) @@ -409,3 +424,17 @@ func (a *app) stopInternalServer() { a.iServerStop <- true <-a.iServerDone } + +func (a *app) restartHandler(w httpbase.ResponseWriter, r *httpbase.Request) { + log.Debug("Restarting clients...") + switch r.Method { + case httpbase.MethodPut: + fallthrough + case httpbase.MethodPost: + a.stopClients(false) + a.startClients() + w.WriteHeader(httpbase.StatusOK) + default: + w.WriteHeader(httpbase.StatusBadRequest) + } +} diff --git a/main/client.go b/main/client.go index c019aa5..ca85d51 100644 --- a/main/client.go +++ b/main/client.go @@ -43,6 +43,7 @@ type clientApp struct { port int tls *tls.Config broker *client.Broker + httpClient *http.Client } func (c *clientApp) setDefaults() (err error) { @@ -88,6 +89,9 @@ func (c *clientApp) setDefaults() (err error) { if c.conf.ScanDelay == 0 { c.conf.ScanDelay = time.Second * 30 } + if c.conf.ErrorBackoff == 0 { + c.conf.ErrorBackoff = 1 + } if c.conf.GroupBy == nil || c.conf.GroupBy.String() == "" { // Default is up to the first dot of the relative path. c.conf.GroupBy = regexp.MustCompile(`^([^\.]*)`) @@ -273,16 +277,17 @@ func (c *clientApp) init() (err error) { } } - httpClient := &http.Client{ - SourceName: c.conf.Name, - TargetHost: c.host, - TargetPort: c.port, - TargetPrefix: c.conf.Target.PathPrefix, - TargetKey: c.conf.Target.Key, - Timeout: c.conf.Timeout, - Compression: c.conf.Compression, - TLS: c.tls, - PartialsDecoder: stage.ReadCompanions, + c.httpClient = &http.Client{ + SourceName: c.conf.Name, + TargetHost: c.host, + TargetPort: c.port, + TargetPrefix: c.conf.Target.PathPrefix, + TargetKey: c.conf.Target.Key, + Timeout: c.conf.Timeout, + Compression: c.conf.Compression, + TLS: c.tls, + PartialsDecoder: stage.ReadCompanions, + BandwidthLogInterval: c.conf.StatInterval, } dump("Server Host: %s", c.host) @@ -302,6 +307,7 @@ func (c *clientApp) init() (err error) { dump("Poll Delay: %s", c.conf.PollDelay.String()) dump("Poll Interval: %s", c.conf.PollInterval.String()) dump("Log Directory: %s", c.conf.LogDir) + dump("Error Backoff Multiplier: %f", c.conf.ErrorBackoff) // Keeping the config dump as a single log entry means fetching recent // "lines" from the log will make sure the entire config is captured @@ -313,11 +319,11 @@ func (c *clientApp) init() (err error) { Store: store, Cache: cache, Queue: queue.NewTagged(qtags, tagger, grouper), - Recoverer: httpClient.Recover, + Recoverer: c.httpClient.Recover, BuildPayload: payload.NewBin, - Transmitter: httpClient.Transmit, - TxRecoverer: httpClient.RecoverTransmission, - Validator: httpClient.Validate, + Transmitter: c.httpClient.Transmit, + TxRecoverer: c.httpClient.RecoverTransmission, + Validator: c.httpClient.Validate, Logger: log.NewFileIO(c.conf.LogDir, nil, nil, false), Renamer: fileToNewName, Tagger: nameToTag, @@ -330,7 +336,12 @@ func (c *clientApp) init() (err error) { PollDelay: c.conf.PollDelay, PollInterval: c.conf.PollInterval, Tags: tags, + ErrorBackoff: c.conf.ErrorBackoff, }, } return } + +func (c *clientApp) destroy() { + c.httpClient.Destroy() +} diff --git a/main/controlled.go b/main/controlled.go index b96121f..444dc74 100644 --- a/main/controlled.go +++ b/main/controlled.go @@ -323,7 +323,6 @@ func getMachineID(forceCompute bool) (id string, err error) { if !compute { _, err = os.Stat(cachePath) compute = os.IsNotExist(err) - err = nil } if compute { id = uuid.New().String() diff --git a/main/controlled_test.go b/main/controlled_test.go index 13d3132..cd09634 100644 --- a/main/controlled_test.go +++ b/main/controlled_test.go @@ -79,9 +79,9 @@ func TestConf(t *testing.T) { } a := &app{ - loop: true, - mode: modeSend, - root: rootDir, + // loop: true, + // mode: modeSend, + // root: rootDir, conf: &sts.Conf{}, } diff --git a/main/server.go b/main/server.go index 186a6e1..5cb26e5 100644 --- a/main/server.go +++ b/main/server.go @@ -98,15 +98,16 @@ func (a *serverApp) init() (err error) { } } a.server = &http.Server{ - ServeDir: dirs.Serve, - Host: conf.Server.Host, - Port: conf.Server.Port, - PathPrefix: conf.Server.PathPrefix, - Compression: conf.Server.Compression, - DecoderFactory: payload.NewDecoder, - IsValid: a.standardValidator, - GateKeepers: stagers, - GateKeeperFactory: newStage, + ServeDir: dirs.Serve, + Host: conf.Server.Host, + Port: conf.Server.Port, + PathPrefix: conf.Server.PathPrefix, + Compression: conf.Server.Compression, + DecoderFactory: payload.NewDecoder, + IsValid: a.standardValidator, + GateKeepers: stagers, + GateKeeperFactory: newStage, + ChanceOfSimulatedFailure: conf.Server.ChanceOfSimulatedFailure, } if conf.Server.TLSCertPath != "" && conf.Server.TLSKeyPath != "" { if a.server.TLS, err = http.LoadTLSConf( diff --git a/payload/bin.go b/payload/bin.go index be3a3e1..b1520c6 100644 --- a/payload/bin.go +++ b/payload/bin.go @@ -7,6 +7,7 @@ import ( "math" "path/filepath" "strings" + "sync" "time" "code.arm.gov/dataflow/sts" @@ -255,7 +256,7 @@ func (bin *Bin) EncodeHeader() (byteMeta []byte, err error) { } // GetEncoder returns an io.Reader for reading the bin content -func (bin *Bin) GetEncoder() io.Reader { +func (bin *Bin) GetEncoder() io.ReadCloser { return NewEncoder(bin) } @@ -268,6 +269,8 @@ type Encoder struct { eop bool // Set to true when a part is completely read. eob bool // Set to true when when the bin is completely read. handle sts.Readable + mux sync.RWMutex + closed bool } // NewEncoder returns a new BinWriter instance @@ -316,6 +319,11 @@ func (b *Encoder) startNextPart() error { // Read reads the next bit of bytes from the current file part func (b *Encoder) Read(p []byte) (n int, err error) { + b.mux.RLock() + defer b.mux.RUnlock() + if b.closed { + return 0, io.EOF + } if b.partIndex == 0 { b.bin.setStarted() } @@ -358,6 +366,16 @@ func (b *Encoder) Read(p []byte) (n int, err error) { return } +func (b *Encoder) Close() error { + b.mux.Lock() + defer b.mux.Unlock() + if b.handle != nil { + b.handle.Close() + } + b.closed = true + return nil +} + // PartDecoder is responsible for parsing individual "parts" of "bin" requests type PartDecoder struct { meta *fileMeta diff --git a/stage/local.go b/stage/local.go index 831039c..2833cc7 100644 --- a/stage/local.go +++ b/stage/local.go @@ -36,6 +36,7 @@ const ( // nValidators is the number of concurrent goroutines doing hash validation nValidators = 24 + stateUnknown = -1 stateReceived = 0 stateValidated = 1 stateFailed = 2 @@ -127,10 +128,19 @@ func New(name, rootDir, targetDir string, logger sts.ReceiveLogger, dispatcher s s.cleanInterval = time.Minute * 30 s.canReceive = true s.scheduleClean() + s.validateCh = make(chan *finalFile, 100) + // Let's not have too many files processed at once + for i := 0; i < nValidators; i++ { + go s.processHandler() + } + s.finalizeCh = make(chan *finalFile, 100) + go s.finalizeHandler() return s } func (s *Stage) getPathLock(key string) *sync.RWMutex { + // s.logDebug("Getting path lock for:", key) + // defer s.logDebug("Got path lock for:", key) s.pathLock.Lock() defer s.pathLock.Unlock() var m *sync.RWMutex @@ -143,6 +153,8 @@ func (s *Stage) getPathLock(key string) *sync.RWMutex { } func (s *Stage) delPathLock(key string) { + // s.logDebug("Deleting path lock for:", key) + // defer s.logDebug("Deleted path lock for:", key) s.pathLock.Lock() defer s.pathLock.Unlock() delete(s.pathLocks, key) @@ -215,10 +227,10 @@ func (s *Stage) initStageFile(path string, size int64) error { return nil } if _, err = os.Stat(path + compExt); !os.IsNotExist(err) { - cached := s.fromCache(path) + cachedState := s.getFileState(path) // In case some catastrophe causes the sender to keep sending the same // file, at least we won't be clobbering legitimate companion files. - if cached == nil || cached.state == stateFailed { + if cachedState == stateUnknown || cachedState == stateFailed { s.logDebug("Removing Stale Companion:", path+compExt) os.Remove(path + compExt) } @@ -236,8 +248,7 @@ func (s *Stage) initStageFile(path string, size int64) error { path, partExt, size, err.Error()) } defer fh.Close() - fh.Truncate(size) - return nil + return fh.Truncate(size) } // Prepare is called with all binned parts of a request before each one is @@ -246,9 +257,11 @@ func (s *Stage) Prepare(parts []sts.Binned) { for _, part := range parts { path := filepath.Join(s.rootDir, part.GetName()) lock := s.getPathLock(path) + s.logDebug("Preparing:", path) lock.Lock() err := s.initStageFile(path, part.GetFileSize()) lock.Unlock() + s.logDebug("Prepared:", path) if err != nil { s.logError(err.Error()) } @@ -273,7 +286,9 @@ func (s *Stage) Receive(file *sts.Partial, reader io.Reader) (err error) { err.Error()) return } - fh.Seek(part.Beg, 0) + if _, err = fh.Seek(part.Beg, 0); err != nil { + return + } _, err = io.Copy(fh, reader) fh.Close() if err != nil { @@ -281,6 +296,8 @@ func (s *Stage) Receive(file *sts.Partial, reader io.Reader) (err error) { } // Make sure we're the only one updating the companion + s.logDebug("Receiving part:", file.Source, file.Name, part.Beg, part.End) + defer s.logDebug("Received part:", file.Source, file.Name, part.Beg, part.End) lock := s.getPathLock(path) lock.Lock() defer lock.Unlock() @@ -301,7 +318,7 @@ func (s *Stage) Receive(file *sts.Partial, reader io.Reader) (err error) { return } - s.logDebug("Part received:", file.Source, file.Name, part.Beg, part.End) + s.logDebug("Wrote part:", file.Source, file.Name, part.Beg, part.End) done := isCompanionComplete(cmp) if done { @@ -355,13 +372,14 @@ func (s *Stage) partReceived(part sts.Binned) bool { beg, end := part.GetSlice() path := filepath.Join(s.rootDir, part.GetName()) lock := s.getPathLock(path) + // s.logDebug("Checking for received part:", path) + // defer s.logDebug("Checked for received part:", path) lock.Lock() defer lock.Unlock() final := &finalFile{ path: path, renamed: part.GetRenamed(), name: part.GetName(), - size: part.GetFileSize(), hash: part.GetFileHash(), prev: part.GetPrev(), } @@ -402,30 +420,28 @@ func (s *Stage) GetFileStatus(relPath string, sent time.Time) int { s.logDebug("Stage polled:", sent, relPath) s.buildCache(sent) path := filepath.Join(s.rootDir, relPath) - f := s.fromCache(path) - if f != nil { - switch f.state { - case stateReceived: - s.logDebug("Stage:", relPath, "(received)") - return sts.ConfirmNone - case stateFailed: - s.logDebug("Stage:", relPath, "(failed)") - return sts.ConfirmFailed - case stateValidated: - file := s.getWaiting(path) - if file != nil { - s.logDebug("Stage:", relPath, "(waiting)") - return sts.ConfirmWaiting - } - s.logDebug("Stage:", relPath, "(done)") - return sts.ConfirmPassed - case stateLogged: - s.logDebug("Stage:", relPath, "(logged)") - return sts.ConfirmPassed - case stateFinalized: - s.logDebug("Stage:", relPath, "(done)") - return sts.ConfirmPassed + state := s.getFileState(path) + switch state { + case stateReceived: + s.logDebug("Stage:", relPath, "(received)") + return sts.ConfirmNone + case stateFailed: + s.logDebug("Stage:", relPath, "(failed)") + return sts.ConfirmFailed + case stateValidated: + file := s.getWaiting(path) + if file != nil { + s.logDebug("Stage:", relPath, "(waiting)") + return sts.ConfirmWaiting } + s.logDebug("Stage:", relPath, "(done)") + return sts.ConfirmPassed + case stateLogged: + s.logDebug("Stage:", relPath, "(logged)") + return sts.ConfirmPassed + case stateFinalized: + s.logDebug("Stage:", relPath, "(done)") + return sts.ConfirmPassed } s.logDebug("Stage:", relPath, "(not found)") return sts.ConfirmNone @@ -650,11 +666,13 @@ func (s *Stage) cleanStrays(minAge time.Duration) { s.logDebug("Checking for stray partial:", relPath, compExists, comp != nil) delete := false deleteCmp := false - file := s.fromCache(strings.TrimSuffix(path, partExt)) - if file != nil && file.state > stateReceived { - delete = comp == nil || comp.Hash == file.hash - deleteCmp = compExists && file.state == stateLogged - s.logDebug("Stray partial cache info:", relPath, file.state, file.hash) + filePath := strings.TrimSuffix(path, partExt) + fileState := s.getFileState(filePath) + fileHash := s.getFileHash(filePath) + if fileState > stateReceived { + delete = comp == nil || comp.Hash == fileHash + deleteCmp = compExists && fileState == stateLogged + s.logDebug("Stray partial cache info:", relPath, fileState, fileHash) } else { end := time.Now() beg := info.ModTime().Add(time.Duration(age.Minutes()) * time.Hour * -1) @@ -693,6 +711,7 @@ func (s *Stage) cleanWaiting() { s.logDebug("Looking for wait loops ...") var waiting []*finalFile s.cacheLock.RLock() + defer s.cacheLock.RUnlock() for _, cacheFile := range s.cache { if cacheFile.state != stateValidated || cacheFile.prev == "" { @@ -700,7 +719,6 @@ func (s *Stage) cleanWaiting() { } waiting = append(waiting, cacheFile) } - s.cacheLock.RUnlock() sort.Slice(waiting, func(i, j int) bool { return waiting[i].time.Before(waiting[j].time) }) @@ -714,7 +732,6 @@ func (s *Stage) cleanWaiting() { if len(loop) == 0 { continue } - loop = nil for _, waitFile := range s.fromWait(prevPath) { s.logInfo("Removing wait loop:", waitFile.name, "<-", waitFile.prev) f := s.fromCache(waitFile.path) @@ -762,15 +779,8 @@ func (s *Stage) partialToFinal(file *sts.Partial) *finalFile { } func (s *Stage) processQueue(file *finalFile) { - if s.validateCh == nil { - s.validateCh = make(chan *finalFile, 100) - // Let's not have too many files processed at once - for i := 0; i < nValidators; i++ { - go s.processHandler() - } - } - s.logDebug("Pushing onto validate chan:", file.name) - defer s.logDebug("Pushed onto validate chan:", file.name) + // s.logDebug("Pushing onto validate chan:", file.name) + // defer s.logDebug("Pushed onto validate chan:", file.name) s.validateCh <- file } @@ -781,15 +791,15 @@ func (s *Stage) processHandler() { } func (s *Stage) process(file *finalFile) { - - s.logDebug("Validating:", file.name) + // s.logDebug("Validating:", file.name) + // defer s.logDebug("Validated:", file.name) fileLock := s.getPathLock(file.path) fileLock.Lock() defer fileLock.Unlock() - existing := s.fromCache(file.path) - if existing == nil || existing.state != stateReceived { + existingState := s.getFileState(file.path) + if existingState == stateUnknown || existingState != stateReceived { s.logDebug("Ignoring invalid (process):", file.name) return } @@ -825,15 +835,12 @@ func (s *Stage) process(file *finalFile) { s.toCache(file, stateValidated) - s.logDebug("Validated:", file.name) go s.finalizeQueue(file) } func (s *Stage) finalizeQueue(file *finalFile) { - if s.finalizeCh == nil { - s.finalizeCh = make(chan *finalFile, 100) - go s.finalizeHandler() - } + s.logDebug("Pushing onto finalize chan:", file.name) + defer s.logDebug("Pushed onto finalize chan:", file.name) s.finalizeCh <- file } @@ -841,7 +848,7 @@ func (s *Stage) finalizeHandler() { defer s.logDebug("Finalize channel done:") for f := range s.finalizeCh { s.logDebug("Finalize chain:", f.name) - if cached := s.fromCache(f.path); cached == nil || cached.state != stateValidated { + if state := s.getFileState(f.path); state != stateValidated { // Skip redundancies or mistakes in the pipe s.logDebug("Already finalized or not ready:", f.name) continue @@ -853,14 +860,14 @@ func (s *Stage) finalizeHandler() { } func (s *Stage) isFileReady(file *finalFile) bool { - if file.prev == "" { + if file.prev == "" || file.prev == file.name { return true } var waitTime time.Duration prevPath := filepath.Join(s.rootDir, file.prev) - prev := s.fromCache(prevPath) - switch { - case prev == nil: + prevState := s.getFileState(prevPath) + switch prevState { + case stateUnknown: if s.hasPathLock(prevPath) { s.logDebug("Previous file in progress:", file.name, "<-", file.prev) break @@ -896,15 +903,15 @@ func (s *Stage) isFileReady(file *finalFile) bool { file.name, "<-", file.prev, "--", beg.Format(tfmt), "-", end.Format(tfmt), took) waitTime = time.Second * 10 - case prev.state == stateReceived: + case stateReceived: s.logDebug("Waiting for previous file:", s.name, file.name, "<-", file.prev) - case prev.state == stateFailed: + case stateFailed: s.logDebug("Previous file failed:", s.name, file.name, "<-", file.prev) - case prev.state == stateValidated: + case stateValidated: if s.isWaiting(prevPath) { s.logDebug("Previous file waiting:", file.name, "<-", file.prev) } else { @@ -921,13 +928,14 @@ func (s *Stage) isFileReady(file *finalFile) bool { func (s *Stage) finalize(file *finalFile) { fileLock := s.getPathLock(file.path) + // s.logDebug("Finalizing prep", file.name) fileLock.Lock() defer s.delPathLock(file.path) defer fileLock.Unlock() - existing := s.fromCache(file.path) - if existing == nil || existing.state != stateValidated { - s.logDebug("Ignoring invalid (final):", file.name, existing.state) + existingState := s.getFileState(file.path) + if existingState != stateValidated { + s.logDebug("Ignoring invalid (final):", file.name, existingState) return } @@ -959,6 +967,8 @@ func (s *Stage) finalize(file *finalFile) { } func (s *Stage) putFileAway(file *finalFile) (targetPath string, err error) { + s.logDebug("Putting file away:", file.name) + defer s.logDebug("Put away:", file.name) // Better to log the file twice rather than receive it twice. If we log // after putting the file away, it's possible that a crash could occur @@ -973,7 +983,9 @@ func (s *Stage) putFileAway(file *finalFile) (targetPath string, err error) { targetName = file.renamed } targetPath = filepath.Join(s.targetDir, targetName) - os.MkdirAll(filepath.Dir(targetPath), 0775) + if err = os.MkdirAll(filepath.Dir(targetPath), 0775); err != nil { + return + } if err = fileutil.Move(file.path+waitExt, targetPath); err != nil { // If the file doesn't exist then something is really wrong. // Either we somehow have two instances running that are stepping @@ -1064,6 +1076,8 @@ func (s *Stage) getWaiting(path string) *finalFile { // fromWait returns the file(s) currently waiting on the file indicated by path func (s *Stage) fromWait(prevPath string) []*finalFile { + // s.logDebug("Wait reading", prevPath) + // defer s.logDebug("Wait read", prevPath) s.waitLock.Lock() defer s.waitLock.Unlock() files, ok := s.wait[prevPath] @@ -1076,6 +1090,8 @@ func (s *Stage) fromWait(prevPath string) []*finalFile { } func (s *Stage) toWait(prevPath string, next *finalFile, howLong time.Duration) { + // s.logDebug("Wait updating", prevPath, "->", next.path) + // defer s.logDebug("Wait updated", prevPath, "->", next.path) s.waitLock.Lock() defer s.waitLock.Unlock() if next.wait != nil { @@ -1108,11 +1124,31 @@ func (s *Stage) toWait(prevPath string, next *finalFile, howLong time.Duration) } func (s *Stage) fromCache(path string) *finalFile { + // s.logDebug("Cache reading", path) + // defer s.logDebug("Cache read", path) s.cacheLock.RLock() defer s.cacheLock.RUnlock() return s.cache[path] } +func (s *Stage) getFileState(path string) int { + s.cacheLock.RLock() + defer s.cacheLock.RUnlock() + if f, ok := s.cache[path]; ok { + return f.state + } + return stateUnknown +} + +func (s *Stage) getFileHash(path string) string { + s.cacheLock.RLock() + defer s.cacheLock.RUnlock() + if f, ok := s.cache[path]; ok { + return f.hash + } + return "" +} + func (s *Stage) inPipe() int { s.cacheLock.RLock() defer s.cacheLock.RUnlock() @@ -1120,6 +1156,8 @@ func (s *Stage) inPipe() int { } func (s *Stage) toCache(file *finalFile, state int) { + // s.logDebug("Caching", file.path, state) + // defer s.logDebug("Cached", file.path, state) s.cacheLock.Lock() defer s.cacheLock.Unlock() file.time = time.Now() @@ -1161,6 +1199,8 @@ func (s *Stage) buildCache(from time.Time) { }(s, from) { return } + // s.logDebug("Building cache from logs:", from) + // defer s.logDebug("Built cache from logs:", from) s.cacheLock.Lock() defer s.cacheLock.Unlock() cacheTime := s.cacheTime @@ -1211,12 +1251,14 @@ func (s *Stage) getCacheStartTime() time.Time { // func (s *Stage) clearCache() { // s.cacheLock.Lock() // defer s.cacheLock.Unlock() -// s.cache = make(map[string]*finalFile) +// s.cache = make(map[string]finalFile) // s.cacheTimes = nil // s.cacheTime = time.Time{} // } func (s *Stage) cleanCache() { + // s.logDebug("Cleaning cache") + // defer s.logDebug("Cleaned cache") s.cacheLock.Lock() defer s.cacheLock.Unlock() var age time.Duration @@ -1234,34 +1276,35 @@ func (s *Stage) cleanCache() { } s.cacheTime = time.Now() for _, cacheFile := range s.cache { - if cacheFile.state >= stateFinalized { - if cacheFile.prev != "" && !cacheFile.nextFinal { - continue - } - age = time.Since(cacheFile.logged) - if age > cacheAgeLogged { - if len(batches) > 0 { - for _, t := range batches { - if t.Equal(cacheFile.time) && cacheFile.state == stateLogged { - // Delete if file was loaded via a batch that is - // ready to be expired - goto delete - } + if cacheFile.state < stateFinalized { + continue + } + if cacheFile.prev != "" && !cacheFile.nextFinal { + continue + } + age = time.Since(cacheFile.logged) + if age > cacheAgeLogged { + if len(batches) > 0 { + for _, t := range batches { + if t.Equal(cacheFile.time) && cacheFile.state == stateLogged { + // Delete if file was loaded via a batch that is + // ready to be expired + goto delete } - goto keep } - delete: - delete(s.cache, cacheFile.path) - s.logDebug("Removed from cache:", cacheFile.name) - continue - } - keep: - // We want the source cacheTime to be the earliest logged time of - // the files still in the cache (except the extra ones we keep - // around to maintain the chain) - if cacheFile.logged.Before(s.cacheTime) { - s.cacheTime = cacheFile.logged + goto keep } + delete: + delete(s.cache, cacheFile.path) + s.logDebug("Removed from cache:", cacheFile.name) + continue + } + keep: + // We want the source cacheTime to be the earliest logged time of + // the files still in the cache (except the extra ones we keep + // around to maintain the chain) + if cacheFile.logged.Before(s.cacheTime) { + s.cacheTime = cacheFile.logged } } s.logDebug("Cache Count:", len(s.cache)) diff --git a/sts.go b/sts.go index 92db7a0..2791187 100644 --- a/sts.go +++ b/sts.go @@ -319,7 +319,7 @@ type Payload interface { GetSize() int64 GetParts() []Binned EncodeHeader() ([]byte, error) - GetEncoder() io.Reader + GetEncoder() io.ReadCloser GetStarted() time.Time GetCompleted() time.Time }