Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow monadic effects in fold #1

Open
bgamari opened this issue Sep 30, 2016 · 11 comments
Open

Allow monadic effects in fold #1

bgamari opened this issue Sep 30, 2016 · 11 comments

Comments

@bgamari
Copy link

bgamari commented Sep 30, 2016

Sometimes there is just no avoid effects. The foldl packages' FoldM type captures this variety of fold nicely.

@nikita-volkov
Copy link
Owner

It looks like what you need is the "hasql-cursor-transaction" package, which provides a lower level of abstraction.

@nikita-volkov
Copy link
Owner

Also can you please provide details on your use-case?

@bgamari
Copy link
Author

bgamari commented Oct 1, 2016

Nikita Volkov [email protected] writes:

It looks like what you need is the "hasql-cursor-transaction"
package
,
which provides a lower level of abstraction.

That looks like a reasonable solution although a FoldM approach would
be nice.

In my case I am reading a large volume of records from a database and
slurping them into a compact, transiently-mutable datastructure

@nikita-volkov
Copy link
Owner

nikita-volkov commented Oct 1, 2016

If it's mutable I guess it implies that the monad that you need is either IO or ST and that makes me suspicious about how you plan to reach out to that monad from either of the context monads that you get with the current API (Session or Transaction). Keep in mind that the Transaction monad prohibits lifting of IO for valid reasons.

It would be beneficial for the discussion, if you provided an end-example of how you would use the presumed FoldM implementation that you expect.

@axman6
Copy link

axman6 commented Nov 8, 2017

I'm also quite keen to see this happen, I'm looking into the upcoming streaming result work in servant, which has an interface where you need to provide a function which can accept a function to pass in data to be streamed to the client (something along the lines of ((ByteString -> IO b) -> IO a) where the user is supposed to pass in byte strings as they are produced to send to the client. As it stands, there is no way to use cursors to send data streamed from the database to the client as it's produced, requiring the whole result to be placed in memory before being serialised and sent out.

I can't really see the point of the hasql cursor work if this sort of workflow isn't possible, I would've thought that streaming data from the database would be one of the primary uses for it, but I can't see how, since none of the relevant monads implement MonadIO or anything else that would allow for interleaved reading and writing.

@nikita-volkov
Copy link
Owner

nikita-volkov commented Nov 8, 2017

The reason for the existing limitations is that the Transaction abstraction from the "hasql-transaction" package doesn't allow IO. It is so because it automates the concurrency conflict resolution by reexecuting comletely in case of concurrency conflicts. This means that if it allowed side effects, they'd possibly be executed multiple times and you would never know how many.

However I see your point. This means that such an abstraction doesn't fit your case and what you need is manual control over the concurrency errors. No problem, just execute your transaction manually. After all, it's just about wrapping your session in the "BEGIN" and "COMMIT" statements.

Having your use case now, I think I have some rough ideas for the update of the "hasql-transaction" design. I wouldn't bet on getting them released any time soon though, so I suggest you to go on solving your problem manually.

@axman6
Copy link

axman6 commented Nov 8, 2017

Thanks Nikita, that's good news. Our current app is performing well enough at the moment, but I'm sure it would perform better if we could stream the data. In case it helps you at all, the PR for the servant streaming support is haskell-servant/servant#836, with the particular part of the interface that's relevant here being the StreamGenerator defined at https://github.com/haskell-servant/servant/pull/836/files#diff-b9ed6ea5569e0ebb1cdcf2f89e536e6cR32

I believe the plan is to rewrite/add functionality to packages such as servant-cassava to support this interface.

@vdukhovni
Copy link

I am also running into similar difficulties. I am fetching O(3 million) rows from the database to generate a report that I am writing to the file-system. I don't need retriable transactions, I just need to run in I/O and be able to output rows as they are fetched in constant memory.

A foldM API (inside a non-retriable transaction) would be ideal. When you say "just execute your transaction manually", what do you mean? I know how begin and end transactions, but what I don't know how to do is fetch and stream rows in constant space with Hasql.

In other words, what goes in the "session"? Manual calls to "DECLARE CURSOR", "FETCH", ... bypassing this API?

@vdukhovni
Copy link

FWIW, in my case the row decoder generates an "IO ()" action that output the row. My foldM would just be (>>), but what I'm missing is a clean way to get the fold to happen as the query results are streaming in.

@vdukhovni
Copy link

Indeed a manual loop using "FETCH FORWARD" runs in constant space. Something along the lines of:

    flip HS.run conn $ do
        HS.sql "BEGIN"
        openReportCursor querySql params
        fetchLoop
        HS.sql "END"
  where
    fetchSql = "FETCH FORWARD 1000 FROM report_cursor"
    fetchLoop dec = do
        rows <- prepared fetchSql () def decoder
        if (V.null rows)
        then return ()
        else forM_ rows liftIO >> fetchLoop decoder
    decoder = HD.rowVector rowDecoder
    prepared sql a enc dec = HS.statement a $ HQ.Statement sql enc dec True

@domenkozar
Copy link

domenkozar commented Jan 29, 2024

In case it's helpful, I also needed a way to stream the results via IO, since I'm using a cursor not to load millions of

cursorQuery :: Int -> ByteString -> HasqlE.Params a -> HasqlD.Row b -> a -> ([b] -> App ()) -> App ()
cursorQuery batchSize query params decoder input action = do
  env <- ask
  cursor_name <- ("cursor_" <>) <<$>> replicateM 10 $ randomRIO ('a', 'z')
  Env.sqlSession $ do
    Hasql.sql "BEGIN READ ONLY"
    catchError (do 
      cursor <- Hasql.statement input (declareCursorQuery cursor_name)
      loop env cursor cursor_name
      ) (const $ do 
          Hasql.sql [i|"CLOSE ${cursor_name}"|]
          Hasql.sql "ROLLBACK"
      )
    cleanup cursor_name
  where
    cleanup cursor_name = do 
      Hasql.sql [i|"CLOSE ${cursor_name}"|]
      Hasql.sql "COMMIT"
    declareCursorQuery cursor_name =
      Hasql.Statement
        [i|DECLARE ${cursor_name} CURSOR FOR ${query}|]
        params Hasql.noResult True
    fetchQuery cursor_name = Hasql.Statement 
      [i|FETCH FORWARD ${batchSize} FROM ${cursor_name}|]
      HasqlE.noParams (HasqlD.rowList decoder) True
    loop env cursor cursor_name = do
      rows <- Hasql.statement () (fetchQuery cursor_name)
      liftIO $ Env.runApp env $ action rows
      unless (null rows) $ loop env cursor cursor_name

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants