-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: Some refactoring ideas for Storage client library #1943
base: main
Are you sure you want to change the base?
Conversation
🤖 I detect that the PR title and the commit message differ and there's only one commit. To use the PR title for the commit history, you can use Github's automerge feature with squashing, or use -- conventional-commit-lint bot |
@@ -170,6 +184,11 @@ String getWriterId(String streamWriterId) { | |||
return connectionWorker().getWriterId(); | |||
} | |||
|
|||
public void register(StreamWriter streamWriter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the comment?
// TODO: What if we simply kept an atomic refcount in ConnectionWorker? We could also | ||
// manage the refcount in the callback below to precisely track which connections are being | ||
// used. | ||
currentConnection.getCurrentStreamWriters().add(streamWriter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this lock good enough to protect currentConnection?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're using it to protect currentConnection.getCurrentStreamWriters()
In theory we could put a lock inside of currentConnection, which would create more granular locking. However this would also cause a lot more lock/unlock activity (e.g. every call to append would have to lock at least two locks) so this change would need measurement to see if it better better.
lock.unlock(); | ||
} | ||
}); | ||
ConnectionWorker currentConnection; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I like this idea. We can reuse the connection more for the same StreamWriter.
TableSchema getUpdatedSchema(StreamWriter streamWriter) { | ||
if (getKind() == Kind.CONNECTION_WORKER) { | ||
return connectionWorker().getUpdatedSchema(); | ||
} else { | ||
return connectionWorkerPool().getUpdatedSchema(streamWriter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this breaks the promise of StreamWriter only saw updates when there is a schema update? I think we should be fine to use nano time since it is monotonic on this machine?https://screenshot.googleplex.com/3Qeo9ouZEnehgMR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think nanoTime completely works. it is monotonic, but not strictly increasing - i.e. the current code is broken if the first update has the same nanoTime as the creation time, which is completely possible.
// TODO: Do we need a global lock here? Or is it enough to just lock the StreamWriter? | ||
lock.lock(); | ||
try { | ||
currentConnection = streamWriter.getCurrentConnectionPoolConnection(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep multiple (at least 2) connections in order to scale up, and avoid look into the global pool?
ConnectionWorker createdOrExistingConnection = null; | ||
try { | ||
createdOrExistingConnection = | ||
createOrReuseConnectionWorker(streamWriter, currentConnection); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still need global lock here.
currentConnection = createdOrExistingConnection; | ||
streamWriter.setCurrentConnectionPoolConnection(currentConnection); | ||
// Update connection to write stream relationship. | ||
// TODO: What if we simply kept an atomic refcount in ConnectionWorker? We could also |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refcount would be error prone as we streamwriter could be switching back and forth between connection workers meaning one worker could be recording a single stream writer multiple times if using refcount
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would be fine. The refcount removal would happen in the done callback (below in ApiFutures.transform), so would know exactly which connection worker to decrement even if the stream writer has moved to a different stream.
Remove streamWriterToConnection and connectionToWriteStream maps, and instead store this data in the StreamWriter and ConnectionWorker objects themselves. This means that we no longer have to do a map lookup on every call to append().
Instead of using timestamps to determine which StreamWriter objects to send updated schema to, use a registration method. This way only StreamWriters that were created prior to a schema-update callback will get the updated schema (Note: this uses a static map, but could instead be done by updating the StreamWriter directly). I think this preserves the intended semantics from before, but needs a good look. Note: the timestamp approach isn't guaranteed to work, since it's possible for the time to stay the same between StreamWriter creation and the callback (Java does not guarantee that System.nanoTime() actually updates every nanosecond - it provides no guarantee on update frequency).
Some other things worth experimenting with in the future: