Skip to content

Commit

Permalink
support sending timestamps with every data chunk to client
Browse files Browse the repository at this point in the history
  • Loading branch information
spq committed Aug 23, 2024
1 parent a624fee commit dfb9c04
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 13 deletions.
99 changes: 87 additions & 12 deletions internal/index/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type (
Data struct {
Direction Direction
Content []byte
Time time.Time
}
)

Expand All @@ -68,10 +69,7 @@ const (
)

func (dir Direction) Reverse() Direction {
if dir == DirectionClientToServer {
return DirectionServerToClient
}
return DirectionClientToServer
return dir ^ DirectionClientToServer ^ DirectionServerToClient
}

func (hg *readerHostGroup) get(id uint16) net.IP {
Expand Down Expand Up @@ -452,9 +450,50 @@ func (s *Stream) Packets() ([]Packet, error) {
}

func (s *Stream) Data() ([]Data, error) {
data := []Data{}
sr := io.NewSectionReader(s.r.file, int64(s.r.header.Sections[sectionData].Begin+s.DataStart), s.r.header.Sections[sectionData].size()-int64(s.DataStart))
off := int64(s.PacketInfoStart) * int64(unsafe.Sizeof(packet{}))
sr := io.NewSectionReader(s.r.file, int64(s.r.header.Sections[sectionPackets].Begin)+off, s.r.header.Sections[sectionPackets].size()-off)
br := bufio.NewReader(sr)
p := packet{}
refTime := s.FirstPacket()
type packetTime struct {
ts time.Time
sz uint64
}
expectWraps := (time.Duration(s.LastPacketTimeNS-s.FirstPacketTimeNS)*time.Nanosecond + time.Microsecond) / (time.Microsecond << 32)
packetTimes := [2][]packetTime{nil, nil}
lastRelPacketTimeMS := uint32(0)
for {
if err := binary.Read(br, binary.LittleEndian, &p); err != nil {
return nil, err
}
if expectWraps != 0 {
if p.RelPacketTimeMS < lastRelPacketTimeMS {
refTime = refTime.Add(time.Microsecond << 32)
expectWraps--
}
lastRelPacketTimeMS = p.RelPacketTimeMS
}
if p.DataSize != 0 {
ts := refTime.Add(time.Duration(p.RelPacketTimeMS) * time.Microsecond)
ci := &packetTimes[((p.Flags&flagsPacketDirection)/flagsPacketDirection)^uint8(DirectionClientToServer)^(flagsPacketDirectionClientToServer/flagsPacketDirection)]
if len(*ci) != 0 && (*ci)[len(*ci)-1].ts == ts {
(*ci)[len(*ci)-1].sz += uint64(p.DataSize)
} else {
*ci = append(*ci, packetTime{ts, uint64(p.DataSize)})
}
}
if (p.Flags & flagsPacketHasNext) == 0 {
break
}
if p.SkipPacketsForData != 0 && expectWraps == 0 {
if _, err := br.Discard(int(p.SkipPacketsForData) * int(unsafe.Sizeof(packet{}))); err != nil {
return nil, err
}
}
}
data := []Data{}
sr = io.NewSectionReader(s.r.file, int64(s.r.header.Sections[sectionData].Begin+s.DataStart), s.r.header.Sections[sectionData].size()-int64(s.DataStart))
br = bufio.NewReader(sr)

content := [2][]byte{}
content[DirectionClientToServer] = make([]byte, s.ClientBytes)
Expand All @@ -467,7 +506,7 @@ func (s *Stream) Data() ([]Data, error) {
}

position := [2]uint64{}
for dir := DirectionClientToServer; ; dir ^= DirectionClientToServer ^ DirectionServerToClient {
for dir := DirectionClientToServer; ; dir = dir.Reverse() {
if position[DirectionClientToServer] == s.ClientBytes && position[DirectionServerToClient] == s.ServerBytes {
break
}
Expand All @@ -486,14 +525,35 @@ func (s *Stream) Data() ([]Data, error) {
if sz == 0 {
continue
}
data = append(data, Data{
Direction: dir,
Content: content[dir][position[dir]:][:sz],
})
position[dir] += sz
pt := &packetTimes[dir][0]
for {
szCur := sz
if szCur > pt.sz {
szCur = pt.sz
}
data = append(data, Data{
Direction: dir,
Content: content[dir][position[dir]:][:szCur],
Time: pt.ts.Local(),
})
position[dir] += szCur
pt.sz -= szCur
sz -= szCur
if sz == 0 {
break
}
if pt.sz == 0 {
packetTimes[dir] = packetTimes[dir][1:]
pt = &packetTimes[dir][0]
}
}
if pt.sz == 0 {
packetTimes[dir] = packetTimes[dir][1:]
}
}
return data, nil
}

func (s *Stream) FirstPacket() time.Time {
return s.r.ReferenceTime.Add(time.Duration(s.FirstPacketTimeNS) * time.Nanosecond)
}
Expand Down Expand Up @@ -558,3 +618,18 @@ func (r *Reader) sectionReader(section section) *io.SectionReader {
s := r.header.Sections[section]
return io.NewSectionReader(r.file, int64(s.Begin), s.size())
}

func (d *Data) MarshalJSON() ([]byte, error) {
if d.Time.IsZero() {
return json.Marshal(struct {
Direction Direction
Content []byte
}{Direction: d.Direction, Content: d.Content})
} else {
return json.Marshal(struct {
Direction Direction
Content []byte
Time time.Time
}{Direction: d.Direction, Content: d.Content, Time: d.Time})
}
}
3 changes: 3 additions & 0 deletions internal/index/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ func TestLongPackets(t *testing.T) {
if got, want := data[i].Direction, [2]Direction{DirectionClientToServer, DirectionServerToClient}[i&1]; got != want {
t.Errorf("data[%d].Direction = %v, want %v", i, got, want)
}
if got, want := data[i].Time, t1.Add(time.Duration(i)*4*time.Minute); !got.Equal(want) {
t.Errorf("data[%d].Time = %v, want %v", i, got, want)
}
}
for i := 0; i < 40; i++ {
if got, want := packets[i].PcapIndex, uint64(i); got != want {
Expand Down
4 changes: 3 additions & 1 deletion web/src/apiClient.guard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ export function isStreamData(obj: unknown): obj is StreamData {
typeof e === "object" ||
typeof e === "function") &&
typeof e["Direction"] === "number" &&
typeof e["Content"] === "string"
typeof e["Content"] === "string" &&
(typeof e["Time"] === "undefined" ||
typeof e["Time"] === "string")
) &&
Array.isArray(typedObj["Tags"]) &&
typedObj["Tags"].every((e: any) =>
Expand Down
1 change: 1 addition & 0 deletions web/src/apiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export type SearchResponse = SearchResult | Error;
export type Data = {
Direction: number;
Content: Base64;
Time?: DateTimeString;
};

/** @see {isStreamData} ts-auto-guard:type-guard */
Expand Down

0 comments on commit dfb9c04

Please sign in to comment.