Skip to content

Commit

Permalink
fix an issue with Stream.Packets not reporting correct timestamps for…
Browse files Browse the repository at this point in the history
… long running (more than 2**32 microseconds or ~1h12m) - add a test for it as well
  • Loading branch information
spq committed Aug 22, 2024
1 parent 63500e3 commit 1b3178c
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 14 deletions.
4 changes: 2 additions & 2 deletions cmd/pkappa2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,9 +740,9 @@ func main() {
}
switch a & AspectAnchor {
case AspectAnchorFirst:
t = s.FirstPacket()
t = s.FirstPacket().Local()
case AspectAnchorLast:
t = s.LastPacket()
t = s.LastPacket().Local()
}
t = t.Truncate(delta)
if skip = (!min.IsZero() && min.After(t)) || (!max.IsZero() && max.Before(t)); skip {
Expand Down
17 changes: 11 additions & 6 deletions internal/index/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ func (s *Stream) Packets() ([]Packet, error) {
flagsPacketDirectionClientToServer: DirectionClientToServer,
flagsPacketDirectionServerToClient: DirectionServerToClient,
}
refTime := s.r.ReferenceTime.Add(time.Nanosecond * time.Duration(s.FirstPacketTimeNS))
refTime := s.FirstPacket()
lastRelPacketTimeMS := uint32(0)
for i := uint64(s.PacketInfoStart); ; i++ {
p, err := s.r.packetByIndex(i)
if err != nil {
Expand All @@ -432,11 +433,15 @@ func (s *Stream) Packets() ([]Packet, error) {
lastImportID = int(p.ImportID)
lastPacketIndex = int(p.PacketIndex)
imp := s.r.imports[p.ImportID]
if p.RelPacketTimeMS < lastRelPacketTimeMS {
refTime = refTime.Add(time.Microsecond << 32)
}
lastRelPacketTimeMS = p.RelPacketTimeMS
packets = append(packets, Packet{
PcapFilename: imp.filename,
PcapIndex: imp.packetIndexOffset + uint64(p.PacketIndex),
Direction: dir[p.Flags&flagsPacketDirection],
Timestamp: refTime.Add(time.Duration(p.RelPacketTimeMS * uint32(time.Millisecond))),
Timestamp: refTime.Add(time.Duration(p.RelPacketTimeMS) * time.Microsecond),
})
}
if p.Flags&flagsPacketHasNext == 0 {
Expand Down Expand Up @@ -490,11 +495,11 @@ func (s *Stream) Data() ([]Data, error) {
return data, nil
}
func (s *Stream) FirstPacket() time.Time {
return s.r.ReferenceTime.Add(time.Duration(s.FirstPacketTimeNS) * time.Nanosecond).Local()
return s.r.ReferenceTime.Add(time.Duration(s.FirstPacketTimeNS) * time.Nanosecond)
}

func (s *Stream) LastPacket() time.Time {
return s.r.ReferenceTime.Add(time.Duration(s.LastPacketTimeNS) * time.Nanosecond).Local()
return s.r.ReferenceTime.Add(time.Duration(s.LastPacketTimeNS) * time.Nanosecond)
}

func (s *Stream) Reader() *Reader {
Expand All @@ -515,8 +520,8 @@ func (s *Stream) MarshalJSON() ([]byte, error) {
Index string
}{
ID: s.ID(),
FirstPacket: s.FirstPacket(),
LastPacket: s.LastPacket(),
FirstPacket: s.FirstPacket().Local(),
LastPacket: s.LastPacket().Local(),
Client: SideInfo{
Host: s.r.hostGroups[s.HostGroup].get(s.ClientHost).String(),
Port: s.ClientPort,
Expand Down
116 changes: 110 additions & 6 deletions internal/index/reader_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package index

import (
"bytes"
"fmt"
"testing"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/reassembly"
"github.com/spq/pkappa2/internal/index/streams"
pcapmetadata "github.com/spq/pkappa2/internal/tools/pcapMetadata"
)

func TestReader(t *testing.T) {
tmpDir := t.TempDir()
t1, err := time.Parse(time.RFC3339, "2020-01-01T12:00:00Z")
if err != nil {
t.Fatalf("time.Parse failed with error: %v", err)
}
streams := map[uint64]streamInfo{}
for i := 0; i < 10; i++ {
streams[uint64(i*100)] = makeStream("1.2.3.4:1234", "4.3.2.1:4321", t1.Add(time.Hour*time.Duration(i)), []string{fmt.Sprintf("foo%d", i)})
Expand Down Expand Up @@ -58,11 +58,115 @@ func TestReader(t *testing.T) {
if len(packets) != 3 {
t.Errorf("len(Stream.Packets()) = %v, want 3", len(packets))
}
if got, want := s1.FirstPacket().UTC(), t1.Add(time.Hour*time.Duration(streamID/100)).UTC(); !got.Equal(want) {
if got, want := s1.FirstPacket(), t1.Add(time.Hour*time.Duration(streamID/100)).UTC(); !got.Equal(want) {
t.Errorf("Stream[%d].FirstPacket() = %v, want %v", streamID, got, want)
}
if got, want := s1.LastPacket().UTC(), t1.Add(time.Hour*time.Duration(streamID/100)+time.Second*3).UTC(); !got.Equal(want) {
if got, want := s1.LastPacket(), t1.Add(time.Hour*time.Duration(streamID/100)+time.Second*3).UTC(); !got.Equal(want) {
t.Errorf("Stream[%d].LastPacket() = %v, want %v", streamID, got, want)
}
}
}

func TestLongPackets(t *testing.T) {
tmpDir := t.TempDir()
pi := pcapmetadata.PcapInfo{
Filename: "foo.pcap",
Filesize: 123,
PacketTimestampMin: t1,
PacketTimestampMax: t1.Add(19 * 4 * time.Minute),
ParseTime: t1.Add(2 * time.Hour),
PacketCount: 3,
}
s := streams.Stream{
ClientAddr: []byte("AAAA"),
ServerAddr: []byte("BBBB"),
ClientPort: 123,
ServerPort: 456,
Flags: streams.StreamFlagsProtocolTCP | streams.StreamFlagsComplete,
}
for i := 0; i < 40; i++ {
s.Packets = append(s.Packets, gopacket.CaptureInfo{
CaptureLength: 123,
Length: 123,
Timestamp: t1.Add(time.Duration(i) * 2 * time.Minute),
AncillaryData: []interface{}{
&pcapmetadata.PcapMetadata{
PcapInfo: &pi,
Index: uint64(i),
},
},
})
if i%2 == 0 {
b := []byte("A")
if i == 6 {
b = make([]byte, 1<<17)
b[0] = 'A'
b[len(b)-1] = 'A'
}
s.Data = append(s.Data, streams.StreamData{
Bytes: b,
PacketIndex: uint64(i),
})
}
s.PacketDirections = append(s.PacketDirections, reassembly.TCPDirClientToServer)
s.PacketDirections = append(s.PacketDirections, reassembly.TCPDirClientToServer)
}
streams := map[uint64]streamInfo{
0: {s: s},
}
idx, err := makeIndex(tmpDir, streams, nil)
if err != nil {
t.Fatalf("makeIndex failed: %v", err)
}
var data []Data
var packets []Packet
if err := idx.AllStreams(func(s *Stream) error {
d, err := s.Data()
if err != nil {
return err
}
p, err := s.Packets()
if err != nil {
return err
}
data = d
packets = p
return nil
}); err != nil {
t.Fatalf("Reader.AllStreams failed with error: %v", err)
}
if len(data) != 20 {
t.Fatalf("len(data) = %v, want 20", len(data))
}
if len(packets) != 40 {
t.Fatalf("len(packets) = %v, want 40", len(packets))
}
for i := 0; i < 20; i++ {
wantData := []byte("A")
if i == 3 {
wantData = make([]byte, 1<<17)
wantData[0] = 'A'
wantData[len(wantData)-1] = 'A'
}
if got := data[i].Content; !bytes.Equal(got, wantData) {
t.Errorf("data[%d].Content = %v, want %v", i, got, wantData)
}
if got, want := data[i].Direction, DirectionClientToServer; got != want {
t.Errorf("data[%d].Direction = %v, want %v", i, got, want)
}
}
for i := 0; i < 40; i++ {
if got, want := packets[i].PcapIndex, uint64(i); got != want {
t.Errorf("packets[%d].PcapIndex = %v, want %v", i, got, want)
}
if got, want := packets[i].PcapFilename, "foo.pcap"; got != want {
t.Errorf("packets[%d].PcapFilename = %v, want %v", i, got, want)
}
if got, want := packets[i].Direction, DirectionClientToServer; got != want {
t.Errorf("packets[%d].Direction = %v, want %v", i, got, want)
}
if got, want := packets[i].Timestamp, t1.Add(time.Duration(i)*2*time.Minute); !got.Equal(want) {
t.Errorf("packets[%d].Timestamp = %v, want %v", i, got, want)
}
}
}

0 comments on commit 1b3178c

Please sign in to comment.