diff --git a/torchx/schedulers/aws_batch_scheduler.py b/torchx/schedulers/aws_batch_scheduler.py index ac551ef97..897116ff0 100644 --- a/torchx/schedulers/aws_batch_scheduler.py +++ b/torchx/schedulers/aws_batch_scheduler.py @@ -170,7 +170,10 @@ def resource_from_resource_requirements( def _role_to_node_properties( - role: Role, start_idx: int, privileged: bool = False + role: Role, + start_idx: int, + privileged: bool = False, + job_role_arn: Optional[str] = None, ) -> Dict[str, object]: role.mounts += get_device_mounts(role.resource.devices) @@ -245,6 +248,8 @@ def _role_to_node_properties( "mountPoints": mount_points, "volumes": volumes, } + if job_role_arn: + container["jobRoleArn"] = job_role_arn if role.num_replicas > 1: instance_type = instance_type_from_resource(role.resource) if instance_type is not None: @@ -349,6 +354,7 @@ class AWSBatchOpts(TypedDict, total=False): privileged: bool share_id: Optional[str] priority: int + job_role_arn: Optional[str] class AWSBatchScheduler(DockerWorkspaceMixin, Scheduler[AWSBatchOpts]): @@ -498,6 +504,7 @@ def _submit_dryrun(self, app: AppDef, cfg: AWSBatchOpts) -> AppDryRunInfo[BatchJ role, start_idx=node_idx, privileged=cfg["privileged"], + job_role_arn=cfg.get("job_role_arn"), ) ) node_idx += role.num_replicas @@ -573,6 +580,11 @@ def _run_opts(self) -> runopts: "Higher number (between 0 and 9999) means higher priority. " "This will only take effect if the job queue has a scheduling policy.", ) + opts.add( + "job_role_arn", + type_=str, + help="The Amazon Resource Name (ARN) of the IAM role that the container can assume for AWS permissions.", + ) return opts def _get_job_id(self, app_id: str) -> Optional[str]: diff --git a/torchx/schedulers/test/aws_batch_scheduler_test.py b/torchx/schedulers/test/aws_batch_scheduler_test.py index 316caeaec..a74f8faf9 100644 --- a/torchx/schedulers/test/aws_batch_scheduler_test.py +++ b/torchx/schedulers/test/aws_batch_scheduler_test.py @@ -152,6 +152,15 @@ def test_submit_dryrun_tags(self, _) -> None: info.request.job_def["tags"], ) + def test_submit_dryrun_job_role_arn(self) -> None: + cfg = AWSBatchOpts({"queue": "ignored_in_test", "job_role_arn": "fizzbuzz"}) + info = create_scheduler("test").submit_dryrun(_test_app(), cfg) + node_groups = info.request.job_def["nodeProperties"]["nodeRangeProperties"] + self.assertEqual(1, len(node_groups)) + self.assertEqual( + cfg["job_role_arn"], node_groups[0]["container"]["jobRoleArn"] + ) + def test_submit_dryrun_privileged(self) -> None: cfg = AWSBatchOpts({"queue": "ignored_in_test", "privileged": True}) info = create_scheduler("test").submit_dryrun(_test_app(), cfg)