Skip to content

Commit

Permalink
Expose preferred locations config (#2762)
Browse files Browse the repository at this point in the history
  • Loading branch information
shiyuhang0 authored and ti-chi-bot committed Oct 9, 2023
1 parent 0446309 commit 01e8ac0
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 2 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,6 @@ object TiConfigConst {
// health check timeout
val GRPC_HEALTH_CHECK_TIMEOUT = "spark.tispark.grpc.health_check_timeout_in_ms"
val GPRC_HEALTH_CHECK_PERIOD = "spark.tispark.grpc.health_check_period_in_ms"
// preferred locations
val PREFERRED_LOCATIONS = "spark.tispark.preferred_locations"
}
5 changes: 5 additions & 0 deletions core/src/main/scala/com/pingcap/tispark/utils/TiUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ object TiUtil {
tiConf.setHealthCheckPeriod(conf
.get(TiConfigConst.GPRC_HEALTH_CHECK_PERIOD, TiConfiguration.DEFHealthCheckPeriod.toString)
.toInt)

if (conf.contains(TiConfigConst.PREFERRED_LOCATIONS)) {
tiConf.setPreferredLocations(conf.get(TiConfigConst.PREFERRED_LOCATIONS))
}

tiConf
}

Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/org/apache/spark/sql/tispark/TiRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ abstract class TiRDD(
extends RDD[InternalRow](sparkSession.sparkContext, Nil) {

private lazy val partitionPerSplit = tiConf.getPartitionPerSplit
private lazy val preferredLocations = tiConf.getPreferredLocations

protected def checkTimezone(): Unit = {
if (!tiConf.getLocalTimeZone.equals(Converter.getLocalTimezone)) {
Expand Down Expand Up @@ -80,6 +81,10 @@ abstract class TiRDD(
result.toArray
}

override protected def getPreferredLocations(split: Partition): Seq[String] =
split.asInstanceOf[TiPartition].tasks.head.getHost :: Nil
override protected def getPreferredLocations(split: Partition): Seq[String] = {
if (preferredLocations.equalsIgnoreCase("host")) {
return split.asInstanceOf[TiPartition].tasks.head.getHost :: Nil
}
Nil
}
}
1 change: 1 addition & 0 deletions docs/userguide_3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ spark.sql("select t1.id,t2.id from spark_catalog.default.t t1 left join tidb_cat
| `spark.tispark.load_tables` | true | (experimental) Whether load all tables when we reload catalog cache. Disable it may cause table not find in scenarios where the table changes frequently. |
| `spark.tispark.grpc.health_check_timeout_in_ms` | 2000 | The timeout of health check for TiKV and TiFlash. |
| `spark.tispark.grpc.health_check_period_in_ms` | 3000 | The period duration of health check. |
| `spark.tispark.preferred_locations` | "" | The preferred locations of TiRDD partitions in TiSpark. Only `host` is available now, which takes host as preferred locations. This configuration is for forward compatibility. |

### TLS Configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public TiConfiguration setCertReloadIntervalInSeconds(String interval) {
public static final int DEFHealthCheckPeriod = 3000;
private int healthCheckTimeout = DEFHealthCheckTimeout;
private int healthCheckPeriod = DEFHealthCheckPeriod;
private String preferredLocations = "";

private static Long getTimeAsSeconds(String key) {
return Utils.timeStringAsSec(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ private SelectResponse process(RegionTask regionTask) {
}

client.addResolvedLocks(startTs, resolvedLocks);
logger.info(
String.format(
"start coprocess request to %s in region %d with timeout %s",
task.getHost(),
region.getId(),
client.getTimeout()));

Collection<RegionTask> tasks =
client.coprocess(backOffer, dagRequest, ranges, responseQueue, startTs);
if (tasks != null) {
Expand Down

0 comments on commit 01e8ac0

Please sign in to comment.