Skip to content

Commit

Permalink
Fixes race conditions and adds some new features:
Browse files Browse the repository at this point in the history
- option to simulate HTTP failures (needed for testing unstable networks)
- option to change the rate at which retries are done (error backoff)
- fixes bug where outgoing files logged multiple times (see line 1094 of client/client.go) - paylod = nil
- adds raw bandwidth logging that includes all network traffic sent/received by client
- ability to restart clients via internal server request
- various fixes to accommodate linting complaints
- race conditions in payload/bin.go and stage/local.go
  • Loading branch information
sbeus committed Aug 29, 2024
1 parent 5860284 commit 6bb9377
Show file tree
Hide file tree
Showing 19 changed files with 648 additions and 235 deletions.
121 changes: 97 additions & 24 deletions .assets/test/test-glut.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,30 @@ if [ ! -z "$serverdebug" ]; then
mkdir -p $(dirname $serveroutput)
fi

cmd_client1="$bin$VCLIENT1 $clientdebug--mode=out --loop"
cmd_client2="$bin$VCLIENT2 $clientdebug--mode=out --loop"
iport=1990

cmd_client1="$bin$VCLIENT1 $clientdebug--mode=out --loop --iport=$iport"
cmd_client2="$bin$VCLIENT2 $clientdebug--mode=out --loop --iport=$iport"

cmd_server1="$bin$VSERVER1 $serverdebug--mode=in"
cmd_server2="$bin$VSERVER2 $serverdebug--mode=in"

sim=(
"stsin-1 xs 100 5000 0"
"stsin-1 xl 100000000 2 0"
"stsin-2 lg 10000000 20 0"
"stsin-2 md 100000 500 0"
"stsin-2 sm 10000 1000 0"
)
for args in "${sim[@]}"; do
echo "Making data: $args ..."
$PWD/$basedir/makedata.py $args
done
function makedata() {
sim=(
"stsin-1 xs 100 10000 0"
"stsin-1 xl 100000000 4 0"
"stsin-2 lg 10000000 40 0"
"stsin-2 md 100000 1000 0"
"stsin-2 sm 10000 2000 0"
)
for args in "${sim[@]}"; do
echo "Making data: $args ..."
$PWD/$basedir/makedata.py $args
done
}

makedata &
makedata_pid=$!

echo "Running server ..."
$cmd_server1 > $serveroutput &
Expand All @@ -56,7 +63,22 @@ pid_server=$!
echo "Running client ..."
$cmd_client1 > $clientoutput &
pid_client=$!
pids=( "$pid_client" "$pid_server" )
pids=( "$pid_client" "$pid_server" "$makedata_pid" )

function memory() {
while true; do
sleep $(( 10 * $sleepfactor ))
echo ""
echo "Memory Usage:"
ps -o rss,args -p $pid_client $pid_server \
| awk 'NR>1 {printf "%.2f GB %s\n", $1/1024/1024, substr($0, index($0,$2))}'
echo ""
done
}

memory &
memory_pid=$!
pids+=($memory_pid)

function monkey() {
count_stg=0
Expand Down Expand Up @@ -94,23 +116,48 @@ function ctrl_c() {
exit 0
}

function cleanrestart() {
echo "Doing a 'clean' client restart ..."
curl -XPUT "http://localhost:$iport/restart-clients"
echo "Client(s) restarted!"
}

# Handle interrupt signal
trap ctrl_c INT

# Sleep for a bit to send some of the data
sleep $(( 20 * $sleepfactor ))
# Sleep until the count of outgoing files goes down
countprev=0
while true; do
sleep $(( 5 * $sleepfactor ))
out=`find $STS_HOME/data/out -type f 2>/dev/null | sort`
if [ "$out" ]; then
lines=($out)
count=${#lines[@]}
# break if the count is going down
if (( countprev > 0 && count < countprev )); then
break
fi
countprev=$count
fi
done

echo "Stopping ..."

# Kill and restart so we can test a glut on start-up
kill -9 $pid_client
kill -9 $pid_server
kill $monkey_pid > /dev/null 2>&1
kill $memory_pid > /dev/null 2>&1

# Make more data
for args in "${sim[@]}"; do
echo "Making data: $args ..."
$PWD/$basedir/makedata.py $args
# Wait until PIDs are gone
while true; do
sleep $(( 1 * $sleepfactor ))
for pid in "${pids[@]}"; do
if ps -p $pid > /dev/null 2>&1; then
continue
fi
break 2
done
done

echo "Restarting server ..."
Expand All @@ -127,12 +174,28 @@ echo "Restarting monkey ..."
monkey &
monkey_pid=$!

pids=( "$pid_client" "$pid_server" "$monkey_pid" )
echo "Restarting memory monitor ..."
memory &
memory_pid=$!

# Make more data
makedata &
makedata_pid=$!

echo "Waiting ..."
pids=( "$pid_client" "$pid_server" "$monkey_pid" "$memory_pid" "$makedata_pid" )

restarttime=$(date +%s)
restartinterval=60

# Wait for the client to be done
while true; do
now=$(date +%s)
elapsed=$((now - restarttime))
echo "Elapsed: $elapsed"
if (( elapsed > restartinterval )); then
cleanrestart
restarttime=$(date +%s)
fi
sleep $(( 5 * $sleepfactor ))
out=`find $STS_HOME/data/out -type f 2>/dev/null | sort`
if [ "$out" ]; then
Expand All @@ -146,7 +209,7 @@ while true; do
done

# Trigger a stage cleanup manually
curl 'localhost:1992/clean?block&minage=1'
# curl 'localhost:1992/clean?block&minage=1'

while true; do
sleep $(( 1 * $sleepfactor ))
Expand All @@ -157,6 +220,7 @@ while true; do
continue
fi
kill $pid_server
kill $memory_pid > /dev/null 2>&1
echo "Done!"
break
done
Expand Down Expand Up @@ -197,12 +261,21 @@ if [ "$stale" ]; then
fi

sortd=`cut -d ":" -f 1 $STS_HOME/data/log/incoming_from/*/*/* | sort`
uniqd=`uniq -c <(cat <(echo "$sortd")) | grep -v " 1"`
uniqd=`uniq -c <(cat <(echo "$sortd")) | grep -v " 1 "`
if [ "$uniqd" ]; then
echo "Incoming Duplicates (probably OK):"
cat <(echo "$uniqd")
echo "SUCCESS?"
exit 0
fi

sortd=`cut -d ":" -f 1 $STS_HOME/data/log/outgoing_to/*/*/* | sort`
uniqd=`uniq -c <(cat <(echo "$sortd")) | grep -v " 1 "`
if [ "$uniqd" ]; then
echo "Outgoing Duplicates:"
cat <(echo "$uniqd")
echo "FAILED!"
exit 0
fi

echo "SUCCESS!"
72 changes: 37 additions & 35 deletions .assets/test/test4.client.yaml
Original file line number Diff line number Diff line change
@@ -1,42 +1,44 @@
OUT: # Outgoing configuration
dirs: # Outgoing directory configuration
cache : .sts/out # Used to store internal caches
logs : data/log
out : data/out # Directory to watch for files to send; appends "/{target}" unless overridden
out-follow : true
cache: .sts/out # Used to store internal caches
logs: data/log
out: data/out # Directory to watch for files to send; appends "/{target}" unless overridden
out-follow: true
sources: # Supports multiple sources where omitted entries will inherit from previous sources hierarchically
- name : stsout-1 # Name of the source
threads : 8 # Maximum number of concurrent connections
scan-delay : 10s
cache-age : 5m
min-age : 5s # How old a file must be before being added to the "outgoing" queue
max-age : 0s # How old a file can be before getting logged as "stale" (remains in the queue)
bin-size : 20MB # The generally-desired size for a given HTTP request (BEFORE any compression)
compress : 0 # Use GZIP compression level (0-9) (NOTE: bin-size is based on file size BEFORE compression)
stat-payload : true # Log payload transmission stats
poll-delay : 2s # How long to wait after file sent before final validation
poll-interval : 5s # How long to wait between polling requests
poll-attempts : 1 # How many times to "poll" for the successful reception of a file before re-sending
- name: stsout-1 # Name of the source
threads: 16 # Maximum number of concurrent connections
scan-delay: 10s
cache-age: 5m
min-age: 5s # How old a file must be before being added to the "outgoing" queue
max-age: 0s # How old a file can be before getting logged as "stale" (remains in the queue)
bin-size: 20MB # The generally-desired size for a given HTTP request (BEFORE any compression)
compress: 0 # Use GZIP compression level (0-9) (NOTE: bin-size is based on file size BEFORE compression)
stat-payload: true # Log payload transmission stats
stat-interval: 30s # How often to log payload transmission stats
poll-delay: 2s # How long to wait after file sent before final validation
poll-interval: 5s # How long to wait between polling requests
poll-attempts: 1 # How many times to "poll" for the successful reception of a file before re-sending
error-backoff: 0.1 # Multiplier for How long to wait before retrying after an error
target: # Target-specific configuration
name : stsin-1 # Name of the target
http-host : localhost:1992
name: stsin-1 # Name of the target
http-host: localhost:1992
tags: # Tags are for configuration based on file patterns (omitted attributes are inherited)
- pattern : DEFAULT # The file "tag" pattern
priority : 1 # Relative importance (higher the number, greater the importance)
order : fifo # File order (fifo (first in, first out), lifo, or none)
delete : true # Whether or not to delete files after reception confirmation
delete-delay : 30s
method : http
- pattern : ^xs/
priority : 3
- pattern : ^sm/
priority : 2
- pattern : ^xl/
priority : 0
- pattern: DEFAULT # The file "tag" pattern
priority: 1 # Relative importance (higher the number, greater the importance)
order: fifo # File order (fifo (first in, first out), lifo, or none)
delete: true # Whether or not to delete files after reception confirmation
delete-delay: 30s
method: http
- pattern: ^xs/
priority: 3
- pattern: ^sm/
priority: 2
- pattern: ^xl/
priority: 0
rename:
- from: '^(?P<pfx>[a-z]{2})/(?P<rest>.+)$'
to: '{{.pfx}}/{{.rest}}.ext'
- name : stsout-2
out-dir : data/out/stsin-2 # Have to override when running on the same host since the target is the same
- from: "^(?P<pfx>[a-z]{2})/(?P<rest>.+)$"
to: "{{.pfx}}/{{.rest}}.ext"
- name: stsout-2
out-dir: data/out/stsin-2 # Have to override when running on the same host since the target is the same
target:
name : stsin-1
name: stsin-1
25 changes: 13 additions & 12 deletions .assets/test/test4.server.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
IN: # Incoming configuration.
log-buffering: true # Set to true only when potentially redundant transfers it not critical
sources:
- stsout-1
- stsout-2
dirs: # Incoming directory configuration.
stage : data/stage # Directory to stage data as it comes in; appends "source" name
final : data/in # Final location for incoming files; appends "/{source}"
logs : data/log # Root directory for log files
server: # Server configuration.
http-host : localhost
http-port : 1992 # What port to listen on
compress : 0 # Use GZIP compression level (0-9) on response data
log-buffering: true # Set to true only when potentially redundant transfers it not critical
sources:
- stsout-1
- stsout-2
dirs: # Incoming directory configuration.
stage: data/stage # Directory to stage data as it comes in; appends "source" name
final: data/in # Final location for incoming files; appends "/{source}"
logs: data/log # Root directory for log files
server: # Server configuration.
http-host: localhost
http-port: 1992 # What port to listen on
compress: 0 # Use GZIP compression level (0-9) on response data
chance-of-simulated-failure: 0.10 # Chance of simulated network failure
19 changes: 19 additions & 0 deletions cache/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,48 @@ type cacheFile struct {
Meta []byte `json:"meta"`
Hash string `json:"hash"`
Done bool `json:"done"`
mux sync.RWMutex
}

func (f *cacheFile) GetPath() string {
f.mux.RLock()
defer f.mux.RUnlock()
return f.path
}

func (f *cacheFile) GetName() string {
f.mux.RLock()
defer f.mux.RUnlock()
return f.name
}

func (f *cacheFile) GetSize() int64 {
f.mux.RLock()
defer f.mux.RUnlock()
return f.Size
}

func (f *cacheFile) GetTime() time.Time {
f.mux.RLock()
defer f.mux.RUnlock()
return f.Time.Time
}

func (f *cacheFile) GetMeta() []byte {
f.mux.RLock()
defer f.mux.RUnlock()
return f.Meta
}

func (f *cacheFile) GetHash() string {
f.mux.RLock()
defer f.mux.RUnlock()
return f.Hash
}

func (f *cacheFile) IsDone() bool {
f.mux.RLock()
defer f.mux.RUnlock()
return f.Done
}

Expand Down Expand Up @@ -128,7 +143,9 @@ func (j *JSON) Done(key string, whileLocked func(sts.Cached)) {
return
}
j.dirty = true
f.(*cacheFile).mux.Lock()
f.(*cacheFile).Done = true
f.(*cacheFile).mux.Unlock()
if whileLocked != nil {
whileLocked(f)
}
Expand All @@ -142,7 +159,9 @@ func (j *JSON) Reset(key string) {
}
j.mutex.Lock()
defer j.mutex.Unlock()
f.(*cacheFile).mux.Lock()
f.(*cacheFile).Hash = ""
f.(*cacheFile).mux.Unlock()
j.dirty = true
}

Expand Down
Loading

0 comments on commit 6bb9377

Please sign in to comment.