Skip to content

Commit

Permalink
[KYUUBI #6652] Support to list batches in descending order
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

Before we only support to list the batches in `ASC` ORDER. It is not user friendly.
## Describe Your Solution 🔧

Support the list the batches in `DESC` order.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6652 from turboFei/latest_batch.

Closes #6652

b3d80f5 [Wang, Fei] ut
dce0b22 [Wang, Fei] doc
d815ec3 [Wang, Fei] ut

Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
  • Loading branch information
turboFei committed Sep 1, 2024
1 parent 353877b commit ac7702c
Show file tree
Hide file tree
Showing 14 changed files with 86 additions and 21 deletions.
19 changes: 10 additions & 9 deletions docs/client/rest/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,15 +332,16 @@ Returns all the batches.

#### Request Parameters

| Name | Description | Type |
|:-----------|:----------------------------------------------------------------------------------------------------|:-------|
| batchType | The batch type, such as spark/flink, if no batchType is specified,<br/> return all types | String |
| batchState | The valid batch state can be one of the following:<br/> PENDING, RUNNING, FINISHED, ERROR, CANCELED | String |
| batchUser | The user name that created the batch | String |
| createTime | Return the batch that created after this timestamp | Long |
| endTime | Return the batch that ended before this timestamp | Long |
| from | The start index to fetch batches | Int |
| size | Number of batches to fetch, 100 by default | Int |
| Name | Description | Type |
|:-----------|:----------------------------------------------------------------------------------------------------|:--------|
| batchType | The batch type, such as spark/flink, if no batchType is specified,<br/> return all types | String |
| batchState | The valid batch state can be one of the following:<br/> PENDING, RUNNING, FINISHED, ERROR, CANCELED | String |
| batchUser | The user name that created the batch | String |
| createTime | Return the batch that created after this timestamp | Long |
| endTime | Return the batch that ended before this timestamp | Long |
| from | The start index to fetch batches | Int |
| size | Number of batches to fetch, 100 by default | Int |
| desc | List the batches in descending order, false by default. | Boolean |

#### Response Body

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class ControlCliArguments(args: Seq[String], env: Map[String, String] = sys.env)
| endTime ${cliConfig.batchOpts.endTime}
| from ${cliConfig.batchOpts.from}
| size ${cliConfig.batchOpts.size}
| desc ${cliConfig.batchOpts.desc}
""".stripMargin
case ControlObject.SERVER =>
s"""Parsed arguments:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class ListBatchCommand(cliConfig: CliConfig) extends Command[GetBatchesResponse]
batchOpts.createTime,
batchOpts.endTime,
if (batchOpts.from < 0) 0 else batchOpts.from,
batchOpts.size)
batchOpts.size,
batchOpts.desc)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ case class BatchOpts(
endTime: Long = 0,
from: Int = -1,
size: Int = 100,
waitCompletion: Boolean = true)
waitCompletion: Boolean = true,
desc: Boolean = false)

case class EngineOpts(
user: String = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,10 @@ object CommandLine extends CommonCommandLine {
} else {
failure("Option --size must be >=0")
})
.text("The max number of records returned in the query."))
.text("The max number of records returned in the query."),
opt[Boolean]("desc")
.action((v, c) => c.copy(batchOpts = c.batchOpts.copy(desc = v)))
.text("List the batches in descending order."))
}

private def logBatchCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ class BatchCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
"--from",
"2",
"--size",
"5")
"5",
"--desc",
"true")
val opArgs = new ControlCliArguments(args)
assert(opArgs.cliConfig.batchOpts.batchType == "spark")
assert(opArgs.cliConfig.batchOpts.batchUser == "tom")
Expand All @@ -144,6 +146,7 @@ class BatchCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
assert(opArgs.cliConfig.batchOpts.endTime == 0)
assert(opArgs.cliConfig.batchOpts.from == 2)
assert(opArgs.cliConfig.batchOpts.size == 5)
assert(opArgs.cliConfig.batchOpts.desc)
}

test("test list batch default option") {
Expand All @@ -154,6 +157,7 @@ class BatchCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
assert(opArgs.cliConfig.batchOpts.batchType == null)
assert(opArgs.cliConfig.batchOpts.from == -1)
assert(opArgs.cliConfig.batchOpts.size == 100)
assert(!opArgs.cliConfig.batchOpts.desc)
}

test("test bad list batch option - size") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ class ControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExit {
| --endTime <value> Batch end time, should be in yyyyMMddHHmmss format.
| --from <value> Specify which record to start from retrieving info.
| --size <value> The max number of records returned in the query.
| --desc <value> List the batches in descending order.
|Command: list session
|${"\t"}List all the live sessions
|Command: list server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ public GetBatchesResponse listBatches(
Long endTime,
int from,
int size) {
return listBatches(
batchType, batchUser, batchState, batchName, createTime, endTime, from, size, false);
}

public GetBatchesResponse listBatches(
String batchType,
String batchUser,
String batchState,
String batchName,
Long createTime,
Long endTime,
int from,
int size,
boolean desc) {
Map<String, Object> params = new HashMap<>();
params.put("batchType", batchType);
params.put("batchUser", batchUser);
Expand All @@ -111,6 +125,7 @@ public GetBatchesResponse listBatches(
}
params.put("from", from);
params.put("size", size);
params.put("desc", desc);
return this.getClient()
.get(API_BASE_PATH, params, GetBatchesResponse.class, client.getAuthHeader());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
@QueryParam("createTime") createTime: Long,
@QueryParam("endTime") endTime: Long,
@QueryParam("from") from: Int,
@QueryParam("size") @DefaultValue("100") size: Int): GetBatchesResponse = {
@QueryParam("size") @DefaultValue("100") size: Int,
@QueryParam("desc") @DefaultValue("false") desc: Boolean): GetBatchesResponse = {
require(
createTime >= 0 && endTime >= 0 && (endTime == 0 || createTime <= endTime),
"Invalid time range")
Expand All @@ -412,7 +413,7 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
requestName = batchName,
createTime = createTime,
endTime = endTime)
val batches = sessionManager.getBatchesFromMetadataStore(filter, from, size)
val batches = sessionManager.getBatchesFromMetadataStore(filter, from, size, desc)
new GetBatchesResponse(from, batches.size, batches.asJava)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,12 @@ class MetadataManager extends AbstractService("MetadataManager") {
.filter(_.sessionType == SessionType.BATCH)
}

def getBatches(filter: MetadataFilter, from: Int, size: Int): Seq[Batch] = {
withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, size)).map(
def getBatches(
filter: MetadataFilter,
from: Int,
size: Int,
desc: Boolean = false): Seq[Batch] = {
withMetadataRequestMetrics(_metadataStore.getMetadataList(filter, from, size, desc)).map(
buildBatch)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,14 @@ trait MetadataStore extends Closeable {
* @param filter the metadata filter conditions.
* @param from the metadata offset.
* @param size the size to get.
* @param desc the order of metadata list.
* @return selected metadata list.
*/
def getMetadataList(filter: MetadataFilter, from: Int, size: Int): Seq[Metadata]
def getMetadataList(
filter: MetadataFilter,
from: Int,
size: Int,
desc: Boolean = false): Seq[Metadata]

/**
* Count the metadata list with filter conditions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,19 @@ class JDBCMetadataStore(conf: KyuubiConf) extends MetadataStore with Logging {
}
}

override def getMetadataList(filter: MetadataFilter, from: Int, size: Int): Seq[Metadata] = {
override def getMetadataList(
filter: MetadataFilter,
from: Int,
size: Int,
desc: Boolean = false): Seq[Metadata] = {
val queryBuilder = new StringBuilder
val params = ListBuffer[Any]()
queryBuilder.append("SELECT ")
queryBuilder.append(METADATA_COLUMNS)
queryBuilder.append(s" FROM $METADATA_TABLE")
queryBuilder.append(s" ${assembleWhereClause(filter, params)}")
queryBuilder.append(" ORDER BY key_id ")
queryBuilder.append(if (desc) "DESC " else "ASC ")
queryBuilder.append(dialect.limitClause(size, from))
val query = queryBuilder.toString
JdbcUtils.withConnection { connection =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,12 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
metadataManager.flatMap(mm => mm.getBatch(batchId))
}

def getBatchesFromMetadataStore(filter: MetadataFilter, from: Int, size: Int): Seq[Batch] = {
metadataManager.map(_.getBatches(filter, from, size)).getOrElse(Seq.empty)
def getBatchesFromMetadataStore(
filter: MetadataFilter,
from: Int,
size: Int,
desc: Boolean = false): Seq[Batch] = {
metadataManager.map(_.getBatches(filter, from, size, desc)).getOrElse(Seq.empty)
}

def getBatchMetadata(batchId: String): Option[Metadata] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,25 @@ abstract class BatchesResourceSuiteBase extends KyuubiFunSuite
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.get()
assert(response7.getStatus === 500)

val response8 = webTarget.path("api/v1/batches")
.queryParam("from", "0")
.queryParam("size", "1")
.queryParam("desc", "false")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.get()
val firstBatch = response8.readEntity(classOf[GetBatchesResponse]).getBatches.get(0)

val response9 = webTarget.path("api/v1/batches")
.queryParam("from", "0")
.queryParam("size", "1")
.queryParam("desc", "true")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, basicAuthorizationHeader("anonymous"))
.get()
val lastBatch = response9.readEntity(classOf[GetBatchesResponse]).getBatches.get(0)
assert(firstBatch.getCreateTime < lastBatch.getCreateTime)
}

test("negative request") {
Expand Down

0 comments on commit ac7702c

Please sign in to comment.