-
Notifications
You must be signed in to change notification settings - Fork 171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementation of AI Remote Worker (AI-323) (rebased) #3168
Implementation of AI Remote Worker (AI-323) (rebased) #3168
Conversation
…strator and aiworker
… capabilities is used
…through. small update to aiResults endpoint and related test update
…ving ai capabilities
This commit ensures that the AImodels startup error is only thrown for AIWorkers.
This commit applies some small textual changes I noticed during my review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. We can remove some redundant code in subsequent pull requests 👍🏻.
This commit adds a new AI remote worker node which can be used to split worker and orchestrator machines similar to how it is done on the transcoding side. Co-authored-by: kyriediculous <[email protected]> Co-authored-by: Reuben Rodrigues <[email protected]> Co-authored-by: Rafał Leszko <[email protected]> Co-authored-by: Rick Staa <[email protected]>
@@ -127,3 +132,102 @@ func ParseStepsFromModelID(modelID *string, defaultSteps float64) float64 { | |||
|
|||
return numInferenceSteps | |||
} | |||
|
|||
// AddAICapabilities adds AI capabilities to the node. | |||
func (n *LivepeerNode) AddAICapabilities(caps *Capabilities) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function should not exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not following. Is this not similar to AddCapacity
?
You are saying it would be better to squash this into AddCapacity
? Without updating tests to cover this addition into AddCapacity
function?
New function was added because transcoding does not use capabilities in this way right now and tried to avoid adding complication to a function used with remote transcoder connection that didn't need to be there.
} | ||
|
||
// RemoveAICapabilities removes AI capabilities from the node. | ||
func (n *LivepeerNode) RemoveAICapabilities(caps *Capabilities) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function should not exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not following. Is this not similar to RemoveCapacity
?
You are saying it would be better to squash this into RemoveCapacity
? Without updating tests to cover this addition into RemoveCapacity
function?
New function was added because transcoding does not use capabilities in this way right now and tried to avoid adding complication to a function used with remote transcoder connection that didn't need to be there.
return fmt.Errorf("failed to reserve AI capability capacity, pipeline does not exist pipeline=%v modelID=%v", pipeline, modelID) | ||
} | ||
|
||
func (n *LivepeerNode) ReleaseAICapability(pipeline string, modelID string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function should not exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not following, please explain. Intent here was to start managing capacity for each pipeline/modelID and also work with workers having multiple GPUs serving the same pipeline/modelID behind one ai worker.
} | ||
} | ||
|
||
func (n *LivepeerNode) ReserveAICapability(pipeline string, modelID string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function should not exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not following, please explain. Intent here was to start managing capacity for each pipeline/modelID and also work with workers having multiple GPUs serving the same pipeline/modelID behind one ai worker.
} | ||
} | ||
|
||
type RemoteAIWorkerManager struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong file structure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most AI related things are in separate files right?
Can you help me understand why using ai_worker.go
files does not help keep development on AI and transcoding from causing issues for anyone developing on one or the other?
|
||
// Called by the aiworker to register to an orchestrator. The orchestrator | ||
// notifies registered aiworkers of jobs as they come in. | ||
rpc RegisterAIWorker(RegisterAIWorkerRequest) returns (stream NotifyAIJob); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RegisterAIWorker request doesn't need to be a newly defined type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A new RegisterAIWorkerRequest
was added because the generic capacity
field is not helpful when trying to manage GPUs for AI jobs in my opinion. For transcoding one GPU can do multiple requests at a time and there was one job type. For AI, my experience is most models slow down significantly when more than one request is fed to it concurrently.
Do you think a generic capacity field set at launch of the ai-worker would let the orchestrator appropriately manage the ai workers?
Do you think that AI workers and remote transcoders would always have the same requirements when connecting to the orchestrator?
"github.com/livepeer/lpms/ffmpeg" | ||
) | ||
|
||
var ErrRemoteWorkerTimeout = errors.New("Remote worker took too long") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this exported ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followed how other similar errors are implemented in core package. Linked errors and errors above could probably be changed to not be exported.
go-livepeer/core/orchestrator.go
Line 852 in 4a66b22
var ErrRemoteTranscoderTimeout = errors.New("Remote transcoder took too long") |
) | ||
|
||
var ErrRemoteWorkerTimeout = errors.New("Remote worker took too long") | ||
var ErrNoCompatibleWorkersAvailable = errors.New("no workers can process job requested") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this exported ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followed how other similar errors are implemented in core package. Linked errors and errors above could probably be changed to not be exported.
go-livepeer/core/orchestrator.go
Line 852 in 4a66b22
var ErrRemoteTranscoderTimeout = errors.New("Remote transcoder took too long") |
|
||
var ErrRemoteWorkerTimeout = errors.New("Remote worker took too long") | ||
var ErrNoCompatibleWorkersAvailable = errors.New("no workers can process job requested") | ||
var ErrNoWorkersAvailable = errors.New("no workers available") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this exported ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followed how other similar errors are implemented in core package. Linked errors and errors above could probably be changed to not be exported.
go-livepeer/core/orchestrator.go
Line 852 in 4a66b22
var ErrRemoteTranscoderTimeout = errors.New("Remote transcoder took too long") |
This commit adds a new AI remote worker node which can be used to split worker and orchestrator machines similar to how it is done on the transcoding side. Co-authored-by: Rafał Leszko <[email protected]> Co-authored-by: Rick Staa <[email protected]>
What does this pull request do? Explain your changes. (required)
See PR 3088 rebased to current
ai-video
branch and adding segment anything 2 pipeline. This was a community effort in the same way PR 3088 was with contributions from multiple contributors in the ecosystem which I am very grateful for. Detail from the commits of PR 3088 were squashed in the rebase to make it cleaner and easier to complete. Credit for the contributions to implementing the remote ai worker will be included on the squashed commit on PR approvalSpecific updates (required)
core
andserver
parts of the ai-worker additions.How did you test each of these updates (required)
Current ai-worker has been used on subnet by myself and Pon. I serve I2I, T2I, I2V and upscale models using 5-6 separate ai-workers on mainnet and have completed over 5,000 requests.
Checklist:
make
runs successfully./test.sh
pass >>tests in core/ai_test.go and server/ai_worker_test.go pass