Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parsing improvements #874

Merged
merged 15 commits into from
Oct 15, 2024
Merged

Parsing improvements #874

merged 15 commits into from
Oct 15, 2024

Conversation

Jolanrensen
Copy link
Collaborator

@Jolanrensen Jolanrensen commented Sep 19, 2024

fixes #849

Small logic rewrite for tryParseImpl and added kdocs.

StringParsers can now be "covered by" another parser, meaning they will be skipped if the other parser is run. It, for instance, makes no sense to check whether a string can be parsed as a java.time.Instant if it cannot be parsed as a kotlinx.datetime.Instant.

Why I didn't remove parsers that are covered by other parsers is keep open the option to skip some parser types in the future. Say a user wants Java date-time classes instead of kotlin ones, the kotlin ones can be skipped and the java ones will still run. This would need to be implemented separately in the future, but I have plans to implement parser-skipping for CSV readers that already handle the parsing of some types but not all.

More importantly, this PR removes as many exceptions as possible from the "default path" of parsing:

When running parse() on a DF with columns of normal strings (a pretty common scenario), for each r number of rows in the column, all parsers are run. Many date-time parsers threw exceptions that needed to be caught which is a heavy affair (about 15 exceptions per String, according to the issue). Now is r usually 1, which limits the expected number of exceptions, however, it still scales with the number of columns c. It's easy to find a worst-case scenario. Imagine a DF consisting of valid LocalTimes for all rows except the final row (maybe a user made an error copy-pasting), We'll now get 14 exceptions times r rows times c columns and the resulting DF will be unchanged... We probably cannot completely erase this scenario, but at least we could limit the number of exceptions thrown :)

To avoid exceptions I did the following:

  • kotlinx.datetime.Instant
    • Instant.parse calls DateTimeComponents.Formats.ISO_DATE_TIME_OFFSET.parse().toInstantUsingOffset() instead of the exception-less parseOrNull(), so we'll simply use that version instead. Plus, to catch leap seconds, when it fails to parse it, we'll try the java instants too.
  • java.time.Duration
    • This function works by applying a regex to the String. I simply copied the regex to manually check whether it can parse it or not before passing it to the java function. Tests are in place to notice us of any changes.
  • kotlin.time.Duration
    • This was a bit more difficult, as it supports both the ISO-8601 format, like Java, but also its own String notation, like "1d 23h 2s". The method I chose was to copy over the logic from the stdlib and replace the exceptions with return null. This might be a bit more difficult to maintain, however, I put some tests in place to check behavioral changes on the kotlin side. (Tests that are "inspired" by the official tests)
  • java.time.Instant, java.time.LocalDateTime, java.time.LocalDate, java.time.LocalTime
    • Instead of calling DateTimeFormatter.parse directly, we can call parseUnresolved first. This we can catch failing without it throwing an error. If it does not fail, we call the normal parse, which has a much lower chance of throwing an exception now.
  • JSON
    • expanded the checks a tiny bit so that the chance of JSON exceptions is slightly lower

Finally I used coroutines (as suggested in #723) to parallelize the parse operation per-column.

… can now be "covered by" another parser, meaning they will be skipped if the other parser is run. parsersOrder was also cleaned up a tiny bit
@Jolanrensen
Copy link
Collaborator Author

I ran a small local test with the IntelliJ profiler to see how effective the drop of exceptions is. The test might not reflect real-world differences, but I emphasised the worst-case scenario (aka a DF with many String columns).

The test:

val df = dataFrameOf(List(5_000) { "_$it" }).fill(100) {
    Random.nextInt().toChar().toString() + Random.nextInt().toChar()
} // a 100r x 5000c DF filled with strings

df.parse()

All parsers are run, coveredBy is disabled for these tests.

These are the results:

Type CPU Time old CPU Time new Memory Allocations old Memory Allocations new
kotlinx.datetime.Instant 9,563 ms 990 ms 5.41 GB 929.55 MB
java.time.Instant 2,851 ms 140 ms 1.42 GB 423.11 MB
kotlinx.datetime.LocalDateTime 10,314 ms 160 ms 4.45 GB 480.71 MB
java.time.LocalDateTime 10,344 ms 190 ms 4.45 GB 481.29 MB
kotlinx.datetime.LocalDate 10,634 ms 190 ms 4.44 GB 488.12 MB
java.time.LocalDate 10,053 ms 180 ms 4.45 GB 486.04 MB
kotlin.time.Duration 4,492 ms 40 ms 1.7 GB 24.64 MB
java.time.Duration 2,501 ms 30 ms 840.99 MB 123.74 MB
kotlinx.datetime.LocalTime 10,894 ms 160 ms 4.44 GB 483.92 MB
java.time.LocalTime 9,674 ms 210 ms 4.44 GB 490.72 MB

@Jolanrensen Jolanrensen marked this pull request as ready for review September 23, 2024 19:08
# Conflicts:
#	core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/impl/api/parse.kt
@@ -55,10 +67,17 @@ internal interface StringParser<T> {

fun applyOptions(options: ParserOptions?): (String) -> T?

/** If a parser with one of these types is run, this parser can be skipped. */
val coveredBy: Collection<KType>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is clear, but probably naming is weird

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any suggestions? :)

// parse each value/frame column at any depth inside each DataFrame in the frame column
col.isFrameColumn() ->
col.values.map {
async {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need a 2-level coroutine here? How many coroutines will be created during parsing large file, saying 1000 000 rows on 100 columns?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we need to set up correct dispatcher or give this ability to the user
https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-dispatcher/

In the case, if we run with the Default Dispatcher, it could consume too many resources on the machine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need a 2-level coroutine here? How many coroutines will be created during parsing large file, saying 1000 000 rows on 100 columns?

Coroutines are cheap :) DataFrames in the cells of frame columns are independent of each other, as are columns in a DF. It only makes sense to split them off in different coroutine branches.

Probably we need to set up correct dispatcher or give this ability to the user

I agree, but I'm not sure how to do this neatly from the API. The way to do it correctly, would be to make parse() and all its overloads suspend functions. That way the user can decide which scope to run it on and with which dispatcher. The problem is that the DF API is not built around coroutines, nor should users be forced to put every call in a suspending context, so this would require all overloads to be written twice, both with and without suspend... @koperagen any ideas?


// Base case, parse the column if it's a `String?` column
col.isSubtypeOf<String?>() ->
col.cast<String?>().tryParse(options)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we throwing for now here some exceptions? in tryParse?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, DataColumn.tryParse means it tries to parse it, but if it fails, it will keep it as String. This is in contrast with DataColumn.parse which does throw an exception if it couldn't be parsed.

I'll add some quick kdoc, because it looks confusing indeed.

Copy link
Collaborator

@zaleslaw zaleslaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I worry about parallel parsing a little bit

Also I had a thought, what if we enalbe DEBUG logging for catchSilent {
logger.debug ("parsing problems")
}

@Jolanrensen
Copy link
Collaborator Author

I worry about parallel parsing a little bit

Also I had a thought, what if we enalbe DEBUG logging for catchSilent { logger.debug ("parsing problems") }

parallel might need a little redesign indeed. Especially if we'd want to introduce coroutines in other parts of the library where they might be useful.

I wouldn't add logging to each individual parser personally, unless we can guarantee that we have 0 impact on performance when the logging level is not debug. But even then, with debug enabled we would generate a TON of logs when parsing.

@koperagen
Copy link
Collaborator

I'd try ParallelStream instead of coroutines

@@ -142,9 +157,340 @@ class ParseTests {
columnOf("2022-01-23T04:29:40").parse().type shouldBe typeOf<LocalDateTime>()
}

@Test
fun `can parse instants`() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we ask stdlib for a non-throwing parsing function?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, my bad, it's from Java, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in any case, i want to understand better what copy-pasted parts are needed

Copy link
Collaborator Author

@Jolanrensen Jolanrensen Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for kotlin Instants, we've got parseOrNull, a non-throwing parsing function :) For Java I managed to make my own parseOrNull.

I copied over a lot of tests from kotlinx-datetime such that we can catch functional changes when we update java/kotlinx-datetime versions.

The biggest thing I copied was regarding Kotlin Duration. It would be great if we could have a non-throwing Duration.parseOrNull in the stdlib. I solved this by creating a function Duration.canParse which contains copied logic but returns false instead of throwing an exception. For this I also put plenty of tests in place to catch functional changes if we bump Kotlin versoin.

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Oct 8, 2024

I'd try ParallelStream instead of coroutines

ParallelStream does work, however, for big json-like DataFrames I'm a bit concerned:
How does ParallelStream handle where a node spawns multiple other parallelstreams? such as for frame columns. I know coroutines neatly manage and distribute their work. Plus, it's more multiplatform-ready.

@Jolanrensen
Copy link
Collaborator Author

Jolanrensen commented Oct 8, 2024

@zaleslaw @koperagen I adjusted the coroutine implementation such that users can supply how they want the parser to run in their coroutinescope, if they desire to do so.

Example can be found in the tests:

fun `Parse with different coroutine scope or context`() {

I made the parse functions inline as to "leak" the suspend scope inside. The benefit of this notation is that the function can be called both inside and outside suspend functions while still allowing the user to control how it's executed.

More explanation here:

public typealias CoroutineProvider<T> = (suspend CoroutineScope.() -> T) -> T

@Jolanrensen
Copy link
Collaborator Author

Removing parallel behavior for now. We can discuss it in #723

Copy link
Contributor

Generated sources will be updated after merging this PR.
Please inspect the changes in here.

@Jolanrensen Jolanrensen merged commit a8cee48 into master Oct 15, 2024
5 checks passed
@Jolanrensen Jolanrensen deleted the parsing-optimization branch October 15, 2024 12:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DataFrame.parse() performance issue for wide DF
3 participants