Skip to content

Commit

Permalink
[fix](thrift)cancel thrift msg max size limit (apache#25194)
Browse files Browse the repository at this point in the history
On Thrift 0.14.0+, need use TConfiguration to raise the max message size.
see https://github.com/apache/arrow/pull/11123/files
  • Loading branch information
wsjz authored Oct 13, 2023
1 parent 6bb0c91 commit de03c15
Show file tree
Hide file tree
Showing 4 changed files with 386 additions and 1 deletion.
8 changes: 7 additions & 1 deletion be/src/util/thrift_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,14 @@ Status deserialize_thrift_msg(const uint8_t* buf, uint32_t* len, bool compact,
// Deserialize msg bytes into c++ thrift msg using memory
// transport. TMemoryBuffer is not const-safe, although we use it in
// a const-safe way, so we have to explicitly cast away the const.
auto conf = std::make_shared<apache::thrift::TConfiguration>();
// On Thrift 0.14.0+, need use TConfiguration to raise the max message size.
// max message size is 100MB default, so make it unlimited.
conf->setMaxMessageSize(std::numeric_limits<int>::max());
std::shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport(
new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf), *len));
new apache::thrift::transport::TMemoryBuffer(
const_cast<uint8_t*>(buf), *len,
apache::thrift::transport::TMemoryBuffer::OBSERVE, conf));
std::shared_ptr<apache::thrift::protocol::TProtocol> tproto =
create_deserialize_protocol(tmem_transport, compact);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
CREATE TABLE IF NOT EXISTS parquet_large_metadata_100mb
(
`0` BIGINT,
`1` BIGINT,
`2` BIGINT,
`3` BIGINT,
`4` BIGINT,
`5` BIGINT,
`6` BIGINT,
`7` BIGINT,
`8` BIGINT,
`9` BIGINT,
`10` BIGINT,
`11` BIGINT,
`12` BIGINT,
`13` BIGINT,
`14` BIGINT,
`15` BIGINT,
`16` BIGINT,
`17` BIGINT,
`18` BIGINT,
`19` BIGINT,
`20` BIGINT,
`21` BIGINT,
`22` BIGINT,
`23` BIGINT,
`24` BIGINT,
`25` BIGINT,
`26` BIGINT,
`27` BIGINT,
`28` BIGINT,
`29` BIGINT,
`30` BIGINT,
`31` BIGINT,
`32` BIGINT,
`33` BIGINT,
`34` BIGINT,
`35` BIGINT,
`36` BIGINT,
`37` BIGINT,
`38` BIGINT,
`39` BIGINT,
`40` BIGINT,
`41` BIGINT,
`42` BIGINT,
`43` BIGINT,
`44` BIGINT,
`45` BIGINT,
`46` BIGINT,
`47` BIGINT,
`48` BIGINT,
`49` BIGINT,
`50` BIGINT,
`51` BIGINT,
`52` BIGINT,
`53` BIGINT,
`54` BIGINT,
`55` BIGINT,
`56` BIGINT,
`57` BIGINT,
`58` BIGINT,
`59` BIGINT,
`60` BIGINT,
`61` BIGINT,
`62` BIGINT,
`63` BIGINT,
`64` BIGINT,
`65` BIGINT,
`66` BIGINT,
`67` BIGINT,
`68` BIGINT,
`69` BIGINT,
`70` BIGINT,
`71` BIGINT,
`72` BIGINT,
`73` BIGINT,
`74` BIGINT,
`75` BIGINT,
`76` BIGINT,
`77` BIGINT,
`78` BIGINT,
`79` BIGINT,
`80` BIGINT,
`81` BIGINT,
`82` BIGINT,
`83` BIGINT,
`84` BIGINT,
`85` BIGINT,
`86` BIGINT,
`87` BIGINT,
`88` BIGINT,
`89` BIGINT,
`90` BIGINT,
`91` BIGINT,
`92` BIGINT,
`93` BIGINT,
`94` BIGINT,
`95` BIGINT,
`96` BIGINT,
`97` BIGINT,
`98` BIGINT,
`99` BIGINT,
`100` BIGINT,
`101` BIGINT,
`102` BIGINT,
`103` BIGINT,
`104` BIGINT,
`105` BIGINT,
`106` BIGINT,
`107` BIGINT,
`108` BIGINT,
`109` BIGINT,
`110` BIGINT,
`111` BIGINT,
`112` BIGINT,
`113` BIGINT,
`114` BIGINT,
`115` BIGINT,
`116` BIGINT,
`117` BIGINT,
`118` BIGINT,
`119` BIGINT,
`120` BIGINT,
`121` BIGINT,
`122` BIGINT,
`123` BIGINT,
`124` BIGINT,
`125` BIGINT,
`126` BIGINT,
`127` BIGINT,
`128` BIGINT,
`129` BIGINT,
`130` BIGINT,
`131` BIGINT,
`132` BIGINT,
`133` BIGINT,
`134` BIGINT,
`135` BIGINT,
`136` BIGINT,
`137` BIGINT,
`138` BIGINT,
`139` BIGINT,
`140` BIGINT,
`141` BIGINT,
`142` BIGINT,
`143` BIGINT,
`144` BIGINT,
`145` BIGINT,
`146` BIGINT,
`147` BIGINT,
`148` BIGINT,
`149` BIGINT,
`150` BIGINT,
`151` BIGINT,
`152` BIGINT,
`153` BIGINT,
`154` BIGINT,
`155` BIGINT,
`156` BIGINT,
`157` BIGINT,
`158` BIGINT,
`159` BIGINT,
`160` BIGINT,
`161` BIGINT,
`162` BIGINT,
`163` BIGINT,
`164` BIGINT,
`165` BIGINT,
`166` BIGINT,
`167` BIGINT,
`168` BIGINT,
`169` BIGINT,
`170` BIGINT,
`171` BIGINT,
`172` BIGINT,
`173` BIGINT,
`174` BIGINT,
`175` BIGINT,
`176` BIGINT,
`177` BIGINT,
`178` BIGINT,
`179` BIGINT,
`180` BIGINT,
`181` BIGINT,
`182` BIGINT,
`183` BIGINT,
`184` BIGINT,
`185` BIGINT,
`186` BIGINT,
`187` BIGINT,
`188` BIGINT,
`189` BIGINT,
`190` BIGINT,
`191` BIGINT,
`192` BIGINT,
`193` BIGINT,
`194` BIGINT,
`195` BIGINT,
`196` BIGINT,
`197` BIGINT,
`198` BIGINT,
`199` BIGINT,
`200` BIGINT,
`201` BIGINT,
`202` BIGINT,
`203` BIGINT,
`204` BIGINT,
`205` BIGINT,
`206` BIGINT,
`207` BIGINT,
`208` BIGINT,
`209` BIGINT,
`210` BIGINT,
`211` BIGINT,
`212` BIGINT,
`213` BIGINT,
`214` BIGINT,
`215` BIGINT,
`216` BIGINT,
`217` BIGINT,
`218` BIGINT,
`219` BIGINT,
`220` BIGINT,
`221` BIGINT,
`222` BIGINT,
`223` BIGINT,
`224` BIGINT,
`225` BIGINT,
`226` BIGINT,
`227` BIGINT,
`228` BIGINT,
`229` BIGINT,
`230` BIGINT,
`231` BIGINT,
`232` BIGINT,
`233` BIGINT,
`234` BIGINT,
`235` BIGINT,
`236` BIGINT,
`237` BIGINT,
`238` BIGINT,
`239` BIGINT,
`240` BIGINT,
`241` BIGINT,
`242` BIGINT,
`243` BIGINT,
`244` BIGINT,
`245` BIGINT,
`246` BIGINT,
`247` BIGINT,
`248` BIGINT,
`249` BIGINT
)
DISTRIBUTED BY HASH(`1`, `2`)
PROPERTIES
(
"replication_allocation" = "tag.location.default: 1"
);
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS parquet_large_metadata_100mb;
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_parquet_large_metadata_load_p2", "p2") {

def tables = ["parquet_large_metadata_100mb" // metadata size more than 100MB
]
def paths = ["s3://doris-build-1308700295/regression/load/metadata/parquet_large_metadata_100mb.parquet"
]
String ak = getS3AK()
String sk = getS3SK()
String enabled = context.config.otherConfigs.get("enableBrokerLoad")

def expect_tvf_result = """[[2, 8], [2, 8], [2, 8], [2, 8], [2, 8]]"""
String[][] tvf_result = sql """select `1`,`2` from s3(
"uri" = "https://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/load/metadata/parquet_large_metadata_100mb.parquet",
"s3.access_key" = "$ak",
"s3.secret_key" = "$sk",
"s3.region" = "ap-beijing",
"format" = "parquet"
) order by `1`,`2` limit 5;
"""
assertTrue("$tvf_result" == "$expect_tvf_result")

def do_load_job = { uuid, path, table ->
sql """
LOAD LABEL $uuid (
APPEND
DATA INFILE("$path")
INTO TABLE $table
FORMAT AS "PARQUET"
)
WITH S3 (
"AWS_ACCESS_KEY" = "$ak",
"AWS_SECRET_KEY" = "$sk",
"AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
"AWS_REGION" = "ap-beijing"
)
PROPERTIES
(
"strict_mode"="true"
);
"""
logger.info("Submit load with lable: $uuid, table: $table, path: $path")
}

def etl_info = ["unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=45000"]
def task_info = ["cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0"]
def error_msg = [""]
// test unified load
if (enabled != null && enabled.equalsIgnoreCase("true")) {
def uuids = []
try {
def i = 0

for (String table in tables) {
sql new File("""${context.file.parent}/ddl/${table}_drop.sql""").text
sql new File("""${context.file.parent}/ddl/${table}_create.sql""").text

def uuid = UUID.randomUUID().toString().replace("-", "0")
uuids.add(uuid)
do_load_job.call(uuid, paths[i], table)
i++
}

i = 0
for (String label in uuids) {
def max_try_milli_secs = 600000
while (max_try_milli_secs > 0) {
String[][] result = sql """ show load where label="$label" order by createtime desc limit 1; """
if (result[0][2].equals("FINISHED")) {
logger.info("Load FINISHED " + label)
assertTrue(result[0][6].contains(task_info[i]))
assertTrue(etl_info[i] == result[0][5], "expected: " + etl_info[i] + ", actual: " + result[0][5] + ", label: $label")
break;
}
if (result[0][2].equals("CANCELLED")) {
assertTrue(result[0][6].contains(task_info[i]))
assertTrue(result[0][7].contains(error_msg[i]))
break;
}
Thread.sleep(1000)
max_try_milli_secs -= 1000
if(max_try_milli_secs <= 0) {
assertTrue(1 == 2, "load Timeout: $label")
}
}
i++
}

def expect_result = """[[45000]]"""

for (String table in tables) {
if (table.matches("parquet_large_metadata_100mb")) {
String[][] actual_result = sql """select count(*) from parquet_large_metadata_100mb;"""
assertTrue("$actual_result" == "$expect_result")
}
}

} finally {
for (String table in tables) {
sql new File("""${context.file.parent}/ddl/${table}_drop.sql""").text
}
}
}
}

0 comments on commit de03c15

Please sign in to comment.