Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

manifold.stream.graph/handle->downstreams size always grow #169

Open
defclass opened this issue Apr 15, 2019 · 2 comments
Open

manifold.stream.graph/handle->downstreams size always grow #169

defclass opened this issue Apr 15, 2019 · 2 comments

Comments

@defclass
Copy link

Hello Zach, I am not sure I encounter a memory leak problem. In my use case, we want to connect to server, and send some packet repeatly by aleph websocket implement. After connected, use two stream to transform data.

(defn- wrap-websocket-stream
  [s]
  (let [out (s/stream)]
    (s/connect (s/map json/generate-string out) s)
    (s/splice out (parse-json-stream s))))

We found var size of manifold.stream.graph/handle->downstreams will always grow.

Also, can repreduce the case to use below code.

(dotimes [i 5] 
  (let [a (s/stream)
       b (s/stream)]
   (s/connect a b)
   @(s/put! a "a")
   (s/close! b)
   (s/close! a)
   (prn (.size manifold.stream.graph/handle->downstreams))))

It's entirely possible I'm using stream incorrectly, thanks.

@ztellman
Copy link
Collaborator

ztellman commented Apr 19, 2019

This is because the pseudo-buffer inside b (which will hold onto 16384 messages that won't fit in the actual buffer) is still holding onto the message even after the stream is closed. If you add a (s/take! b) after the put!, you'll see everything clean itself up as expected.

I'm not sure if this is actually the problem you're having, as usually streams in Aleph ground out in a socket that won't exhibit this behavior, but I still think it's confusing and probably incorrect. I'm pushing a change that will fix the sample code you've provided, I'd be interested to know if it fixes the memory leak you're seeing in production.

EDIT: it seems like the timing the current behavior is relied upon in some tests, I need to dig deeper to see if that's incidental, or there's a reason for this behavior I've managed to forget. I'll update once I have more to share.

@defclass
Copy link
Author

Thanks for reply.

If you add a (s/take! b) after the put!, you'll see everything clean itself up as expected.

Yes , it is the case. We use aleph as the client to do some monitor tests, which we don't need to handle the incoming message, just send data after connected, then close the connection.
I mistakenly take it for granted , which invoking s/close! not only close the stream , but also will clean up all the resource.

Not sure if it is reasonable that manifold provide some explicit function to release resource of specific stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants