Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emran/debug test failure #2818

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### General
- \#2758 Accept only active Os to receive traffic and redeem tickets (@leszko)
- \#2775 Reduce number of ETH RPC calls during block polling (@leszko)
- \#2815 Add new logging methods to publish a set of public logs (@emranemran)

#### Broadcaster

Expand Down
54 changes: 43 additions & 11 deletions clog/clog.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ type clogContextKeyT struct{}
var clogContextKey = clogContextKeyT{}

const (
ClientIP = "clientIP"
ClientIP = "clientIP"
publicLogTag = "[PublicLogs] "

// standard keys
manifestID = "manifestID"
Expand All @@ -35,6 +36,7 @@ type Verbose bool

var stdKeys map[string]bool
var stdKeysOrder = []string{manifestID, sessionID, nonce, seqNo, orchSessionID}
var publicLogKeys = []string{manifestID, sessionID, orchSessionID, ClientIP}

func init() {
stdKeys = make(map[string]bool)
Expand Down Expand Up @@ -123,27 +125,27 @@ func GetVal(ctx context.Context, key string) string {
}

func Warningf(ctx context.Context, format string, args ...interface{}) {
msg, _ := formatMessage(ctx, false, format, args...)
msg, _ := formatMessage(ctx, false, false, format, args...)
glog.WarningDepth(1, msg)
}

func Errorf(ctx context.Context, format string, args ...interface{}) {
msg, _ := formatMessage(ctx, false, format, args...)
msg, _ := formatMessage(ctx, false, false, format, args...)
glog.ErrorDepth(1, msg)
}

func Fatalf(ctx context.Context, format string, args ...interface{}) {
msg, _ := formatMessage(ctx, false, format, args...)
msg, _ := formatMessage(ctx, false, false, format, args...)
glog.FatalDepth(1, msg)
}

func Infof(ctx context.Context, format string, args ...interface{}) {
infof(ctx, false, format, args...)
infof(ctx, false, false, format, args...)
}

// InfofErr if last argument is not nil it will be printed as " err=%q"
func InfofErr(ctx context.Context, format string, args ...interface{}) {
infof(ctx, true, format, args...)
infof(ctx, true, false, format, args...)
}

func V(level glog.Level) Verbose {
Expand All @@ -154,7 +156,7 @@ func V(level glog.Level) Verbose {
// See the documentation of V for usage.
func (v Verbose) Infof(ctx context.Context, format string, args ...interface{}) {
if v {
infof(ctx, false, format, args...)
infof(ctx, false, false, format, args...)
}
}

Expand All @@ -164,12 +166,12 @@ func (v Verbose) InfofErr(ctx context.Context, format string, args ...interface{
err = args[len(args)-1]
}
if v || err != nil {
infof(ctx, true, format, args...)
infof(ctx, true, false, format, args...)
}
}

func infof(ctx context.Context, lastErr bool, format string, args ...interface{}) {
msg, isErr := formatMessage(ctx, lastErr, format, args...)
func infof(ctx context.Context, lastErr bool, publicLog bool, format string, args ...interface{}) {
msg, isErr := formatMessage(ctx, lastErr, publicLog, format, args...)
if isErr {
glog.ErrorDepth(2, msg)
} else {
Expand Down Expand Up @@ -205,8 +207,11 @@ func messageFromContext(ctx context.Context, sb *strings.Builder) {
cmap.mu.RUnlock()
}

func formatMessage(ctx context.Context, lastErr bool, format string, args ...interface{}) (string, bool) {
func formatMessage(ctx context.Context, lastErr bool, publicLog bool, format string, args ...interface{}) (string, bool) {
var sb strings.Builder
if publicLog {
sb.WriteString(publicLogTag)
}
messageFromContext(ctx, &sb)
var err interface{}
if lastErr && len(args) > 0 {
Expand All @@ -219,3 +224,30 @@ func formatMessage(ctx context.Context, lastErr bool, format string, args ...int
}
return sb.String(), err != nil
}

func PublicInfof(ctx context.Context, format string, args ...interface{}) {
publicCtx := context.Background()

publicCtx = PublicCloneCtx(ctx, publicCtx, publicLogKeys)

infof(publicCtx, false, true, format, args...)
}

// PublicCloneCtx creates a new context but only copies key/val pairs from the original context
// that are allowed to be published publicly (i.e. list in []publicLogKeys
func PublicCloneCtx(originalCtx context.Context, publicCtx context.Context, publicLogKeys []string) context.Context {
cmap, _ := originalCtx.Value(clogContextKey).(*values)
publicCmap := newValues()
if cmap != nil {
cmap.mu.RLock()
for k, v := range cmap.vals {
for _, key := range publicLogKeys {
if key == k {
publicCmap.vals[k] = v
}
}
}
cmap.mu.RUnlock()
}
return context.WithValue(publicCtx, clogContextKey, publicCmap)
}
45 changes: 40 additions & 5 deletions clog/clog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,61 @@ func TestStdKeys(t *testing.T) {
ctx = AddOrchSessionID(ctx, "orchID")
ctx = AddSeqNo(ctx, 9427)
ctx = AddVal(ctx, "customKey", "customVal")
msg, _ := formatMessage(ctx, false, "testing message num=%d", 452)
msg, _ := formatMessage(ctx, false, false, "testing message num=%d", 452)
assert.Equal("manifestID=manID sessionID=sessionID nonce=1038 seqNo=9427 orchSessionID=orchID customKey=customVal testing message num=452", msg)
ctxCloned := Clone(context.Background(), ctx)
ctxCloned = AddManifestID(ctxCloned, "newManifest")
msgCloned, _ := formatMessage(ctxCloned, false, "testing message num=%d", 4521)
msgCloned, _ := formatMessage(ctxCloned, false, false, "testing message num=%d", 4521)
assert.Equal("manifestID=newManifest sessionID=sessionID nonce=1038 seqNo=9427 orchSessionID=orchID customKey=customVal testing message num=4521", msgCloned)
// old context shouldn't change
msg, _ = formatMessage(ctx, false, "testing message num=%d", 452)
msg, _ = formatMessage(ctx, false, false, "testing message num=%d", 452)
assert.Equal("manifestID=manID sessionID=sessionID nonce=1038 seqNo=9427 orchSessionID=orchID customKey=customVal testing message num=452", msg)
}

func TestLastErr(t *testing.T) {
assert := assert.New(t)
ctx := AddManifestID(context.Background(), "manID")
var err error
msg, isErr := formatMessage(ctx, true, "testing message num=%d", 452, err)
msg, isErr := formatMessage(ctx, true, false, "testing message num=%d", 452, err)
assert.Equal("manifestID=manID testing message num=452", msg)
assert.False(isErr)
err = errors.New("test error")
msg, isErr = formatMessage(ctx, true, "testing message num=%d", 452, err)
msg, isErr = formatMessage(ctx, true, false, "testing message num=%d", 452, err)
assert.Equal("manifestID=manID testing message num=452 err=\"test error\"", msg)
assert.True(isErr)
}

// Verify we do not leak contextual info inadvertently
func TestPublicLogs(t *testing.T) {
assert := assert.New(t)
// These should be visible:
ctx := AddManifestID(context.Background(), "fooManID")
ctx = AddSessionID(ctx, "fooSessionID")
ctx = AddOrchSessionID(ctx, "fooOrchID")
// These should not be visible:
ctx = AddNonce(ctx, 999)
ctx = AddSeqNo(ctx, 555)
ctx = AddVal(ctx, "foo", "Bar")

publicCtx := PublicCloneCtx(ctx, context.Background(), publicLogKeys)

// Verify the keys in publicLogKeys list gets copied to logs:
val := GetVal(publicCtx, manifestID)
assert.Equal("fooManID", val)
val = GetVal(publicCtx, sessionID)
assert.Equal("fooSessionID", val)
val = GetVal(publicCtx, orchSessionID)
assert.Equal("fooOrchID", val)

// Verify random keys cannot be leaked:
val = GetVal(publicCtx, nonce)
assert.Equal("", val)
val = GetVal(publicCtx, seqNo)
assert.Equal("", val)
val = GetVal(publicCtx, "foo")
assert.Equal("", val)

// Verify [PublicLogs] gets pre-pended:
msg, _ := formatMessage(ctx, false, true, "testing message num=%d", 123)
assert.Equal("[PublicLogs] manifestID=fooManID sessionID=fooSessionID nonce=999 seqNo=555 orchSessionID=fooOrchID foo=Bar testing message num=123", msg)
}
70 changes: 67 additions & 3 deletions server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func removeSessionFromList(sessions []*BroadcastSession, sess *BroadcastSession)
return res
}

func selectSession(sessions []*BroadcastSession, exclude []*BroadcastSession, durMult int) *BroadcastSession {
func selectSession(ctx context.Context, sessions []*BroadcastSession, exclude []*BroadcastSession, durMult int) *BroadcastSession {
for _, session := range sessions {
// A session in the exclusion list is not selectable
if includesSession(exclude, session) {
Expand All @@ -216,6 +216,28 @@ func selectSession(sessions []*BroadcastSession, exclude []*BroadcastSession, du
// threshold is selectable
if len(session.SegsInFlight) == 0 {
if session.LatencyScore > 0 && session.LatencyScore <= SELECTOR_LATENCY_SCORE_THRESHOLD {
/* if session.OrchestratorInfo.GetTicketParams() != nil {
clog.PublicInfof(ctx,
"Selected orchestrator eth-address=, ip-address=, reason=%v",
ethcommon.Bytes2Hex(session.OrchestratorInfo.TicketParams.Recipient),
session.OrchestratorInfo.Transcoder,
fmt.Sprintf(
"performance: no segments in flight, latency score of %v < %v",
session.LatencyScore,
durMult,
),
)
} else {*/
clog.PublicInfof(ctx,
"Selecting new orchestrator, reason=%v",
fmt.Sprintf(
"performance: no segments in flight, latency score of %v < %v",
session.LatencyScore,
durMult,
),
)

//}
return session
}
}
Expand All @@ -230,6 +252,28 @@ func selectSession(sessions []*BroadcastSession, exclude []*BroadcastSession, du
maxTimeInFlight := time.Duration(durMult) * oldestSegInFlight.segDur

if timeInFlight < maxTimeInFlight {
/*if session.OrchestratorInfo.GetTicketParams() != nil {
clog.PublicInfof(ctx,
"Selected orchestrator eth-address=0x%v, ip-address=%v, reason=%v",
ethcommon.Bytes2Hex(session.OrchestratorInfo.TicketParams.Recipient),
session.OrchestratorInfo.Transcoder,
fmt.Sprintf(
"performance: segments in flight, latency score of %v < %v",
session.LatencyScore,
durMult,
),
)
} else {
clog.PublicInfof(ctx,
"Selected orchestrator reason=%v",
fmt.Sprintf(
"performance: segments in flight, latency score of %v < %v",
session.LatencyScore,
durMult,
),
)

}*/
return session
}
}
Expand Down Expand Up @@ -258,7 +302,7 @@ func (sp *SessionPool) selectSessions(ctx context.Context, sessionsNum int) []*B

// Re-use last session if oldest segment is in-flight for < segDur
gotFromLast := false
sess = selectSession(sp.lastSess, selectedSessions, 1)
sess = selectSession(ctx, sp.lastSess, selectedSessions, 1)
if sess == nil {
// Or try a new session from the available ones
sess = sp.sel.Select(ctx)
Expand All @@ -268,7 +312,7 @@ func (sp *SessionPool) selectSessions(ctx context.Context, sessionsNum int) []*B

if sess == nil {
// If no new sessions are available, re-use last session when oldest segment is in-flight for < 2 * segDur
sess = selectSession(sp.lastSess, selectedSessions, 2)
sess = selectSession(ctx, sp.lastSess, selectedSessions, 2)
if sess != nil {
gotFromLast = true
clog.V(common.DEBUG).Infof(ctx, "No sessions in the selector for manifestID=%v re-using orch=%v with acceptable in-flight time",
Expand Down Expand Up @@ -303,6 +347,7 @@ func (sp *SessionPool) selectSessions(ctx context.Context, sessionsNum int) []*B
sess.SegsInFlight = nil
sp.lastSess = removeSessionFromList(sp.lastSess, sess)
clog.V(common.DEBUG).Infof(ctx, "Removing orch=%v from manifestID=%s session list", sess.Transcoder(), sp.mid)
// clog.PublicInfof(ctx, "Removing orch=%v from manifestID=%s session list", sess.Transcoder(), sp.mid)
if monitor.Enabled {
monitor.OrchestratorSwapped(ctx)
}
Expand All @@ -317,6 +362,25 @@ func (sp *SessionPool) selectSessions(ctx context.Context, sessionsNum int) []*B
if !includesSession(selectedSessions, ls) {
clog.V(common.DEBUG).Infof(ctx, "Swapping from orch=%v to orch=%+v for manifestID=%s", ls.Transcoder(),
getOrchs(selectedSessions), sp.mid)
/*if ls.OrchestratorInfo.GetTicketParams() != nil {
clog.PublicInfof(ctx,
"Swapping from orch=%v to orch=%+v, eth-address=0x%v, ip-address=%v, reason=%v",
ls.Transcoder(),
getOrchs(selectedSessions),
ethcommon.Bytes2Hex(ls.OrchestratorInfo.TicketParams.Recipient),
ls.OrchestratorInfo.Transcoder,
fmt.Sprintf("performance: latency score of %v", ls.LatencyScore),
)
} else {
clog.PublicInfof(ctx,
"Swapping from orch=%v to orch=%+v, reason=%v",
ls.Transcoder(),
getOrchs(selectedSessions),
fmt.Sprintf("performance: latency score of %v", ls.LatencyScore),
)

}*/

if monitor.Enabled {
monitor.OrchestratorSwapped(ctx)
}
Expand Down
9 changes: 5 additions & 4 deletions server/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ func TestSelectSession_MultipleInFlight2(t *testing.T) {

func TestSelectSession_NoSegsInFlight(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()

sess := &BroadcastSession{}
sessList := []*BroadcastSession{sess}
Expand All @@ -456,22 +457,22 @@ func TestSelectSession_NoSegsInFlight(t *testing.T) {
sess.SegsInFlight = []SegFlightMetadata{
{startTime: time.Now().Add(time.Duration(-1) * time.Second), segDur: 1 * time.Second},
}
s := selectSession(sessList, nil, 1)
s := selectSession(ctx, sessList, nil, 1)
assert.Nil(s)

// Session has no segs in flight, latency score = 0
sess.SegsInFlight = nil
s = selectSession(sessList, nil, 1)
s = selectSession(ctx, sessList, nil, 1)
assert.Nil(s)

// Session has no segs in flight, latency score > SELECTOR_LATENCY_SCORE_THRESHOLD
sess.LatencyScore = SELECTOR_LATENCY_SCORE_THRESHOLD + 0.001
s = selectSession(sessList, nil, 1)
s = selectSession(ctx, sessList, nil, 1)
assert.Nil(s)

// Session has no segs in flight, latency score > 0 and < SELECTOR_LATENCY_SCORE_THRESHOLD
sess.LatencyScore = SELECTOR_LATENCY_SCORE_THRESHOLD - 0.001
s = selectSession(sessList, nil, 1)
s = selectSession(ctx, sessList, nil, 1)
assert.Equal(sess, s)
}

Expand Down