Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new feature: aggregation and order by #10

Open
gabrielmocan opened this issue Apr 8, 2024 · 10 comments
Open

Add new feature: aggregation and order by #10

gabrielmocan opened this issue Apr 8, 2024 · 10 comments
Assignees
Labels
feature request Feature request

Comments

@gabrielmocan
Copy link

Hi Pete, so long in this project.

It's working very nice, but I may ask a few features?

My biggest problem currently are very large flow data, mostly due to DDoS attacks. I have a few exporters that sometimes push more than 300MB of data in a single minute, going above 3M flows in the nffile.

I would like to be able to aggregate fields, pretty much like the -A parameter from classic nfdump. Also, an equivalent to -O to order by the output.

My intention is to do some sort of downsampling in those cases. Some kind of 'aggregate by values < x'. I'm open to ideas also.

@gabrielmocan
Copy link
Author

ORDER BY would be most desired by now, as aggregation can be done down the pipe.

@phaag
Copy link
Owner

phaag commented Apr 10, 2024

Hi Gabe,
Sure - it's always welcome!.
I need to check, how I could this implement efficient. So far go-nfdump is a reader and nothing more. The -A aggregation is not that easy in Go, but let me see, what I can do. Just allow me some time to experiment.

@gabrielmocan
Copy link
Author

@phaag no rush, I've managed to do some workarounds here to downsample, still, this a desired feature.

Do you think -O is easier than -A? -O would help me a lot when downsampling as I'm doing a 'lesser than x packets' cutoff logic. Ordering the output by packets without having to read all data blocks would optimize this process, as I would enter a 1:N downsampling loop as soon as cutoff point is reached.

For now I just read the entire nffile, then sort the slice and downsample records that have 'less than x packets'.

@phaag
Copy link
Owner

phaag commented May 9, 2024

@gabrielmocan - I've created a new branch work for testing. Could you please checkout work for tests?

Changes in existing code:
AllRecords() no longer returns a channel, but a chain object. This enables chaining processing filters - in this case for -O maybe for more in future. In order to get the final records, use Get() as the final chain element.

Example - simply list all records

if recordChannel, err := nffile.AllRecords().Get(); err != nil {
		fmt.Printf("Failed to process flows: %v\n", err)
	} else {
		for record := range recordChannel {
			record.PrintLine()
		}
	}

The new chain processing function is: OrderBy(type, direction) . This processing element adds the ordering of the records, equivalent to nfdump -O tstart:a or nfdump -O tstart:d as an example. Currently OrderBy is limited to tstart, tend, packets, bytes and direction can be ASCENDING or DESCENDING. It can be extended if needed.
At the end of the chain Get() the records and process them as usually.

You will find some example code in the folder example/sorter

    if recordChannel, err := nffile.AllRecords().OrderBy("bytes", nfdump.DESCENDING).Get(); err != nil {
        fmt.Printf("Failed to process flows: %v\n", err)
    } else {
        for record := range recordChannel {
            record.PrintLine()
        }
    }

Please send me your feedback. With your feedback integrated, I can merge the work branch into main.
A -A aggregation can be done the same way, but needs some work for an efficient hash table.

@gabrielmocan
Copy link
Author

@phaag will do some testing and feedback to you. Thanks in advance!

@phaag phaag self-assigned this May 10, 2024
@phaag phaag added the feature request Feature request label May 10, 2024
@phaag
Copy link
Owner

phaag commented May 11, 2024

@phaag no rush, I've managed to do some workarounds here to downsample, still, this a desired feature.

Do you think -O is easier than -A? -O would help me a lot when downsampling as I'm doing a 'lesser than x packets' cutoff logic. Ordering the output by packets without having to read all data blocks would optimize this process, as I would enter a 1:N downsampling loop as soon as cutoff point is reached.

For now I just read the entire nffile, then sort the slice and downsample records that have 'less than x packets'.

-A should be doable as well

@gabrielmocan
Copy link
Author

@phaag after some testing, the function is working as expected. We can try -A, if viable.

@gabrielmocan
Copy link
Author

@phaag after further testing, I noticed that if nffile has more than 1024*1024 records, the code panics.

I've tracked this down to these default values in orderby.go

// store all flow records into an array for later printing
// initial len - 1 meg
recordArray = make([]*FlowRecordV3, 1024*1024)
// store value to be sorted and index of appropriate flow record of
// recordArray. initial len - 1 meg
sortArray = make([]sortRecord, 1024*1024)

If I change these default values to greater than the flow count, panic is gone.

Could we create those slices based on nffile.StatRecord.Numflows? That would be an exact match. No need to resize.

Sample sent via e-mail.

phaag added a commit that referenced this issue May 20, 2024
@phaag
Copy link
Owner

phaag commented May 20, 2024

It's fixed in work branch. Please test!
Thanks

@gabrielmocan
Copy link
Author

It's fixed in work branch. Please test! Thanks

It works just fine 😎

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request Feature request
Projects
None yet
Development

No branches or pull requests

2 participants