Skip to content

Commit

Permalink
[fix](inverted index) Modify Error Handling for File Open Failure
Browse files Browse the repository at this point in the history
  • Loading branch information
zzzxl1993 committed Nov 26, 2024
1 parent 05b48d6 commit 64a3a1e
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 5 deletions.
26 changes: 21 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -662,15 +662,28 @@ Status Compaction::do_inverted_index_compaction() {
try {
std::vector<std::unique_ptr<DorisCompoundReader>> src_idx_dirs(src_segment_num);
for (int src_segment_id = 0; src_segment_id < src_segment_num; src_segment_id++) {
src_idx_dirs[src_segment_id] =
DORIS_TRY(inverted_index_file_readers[src_segment_id]->open(index_meta));
auto res = inverted_index_file_readers[src_segment_id]->open(index_meta);
DBUG_EXECUTE_IF("Compaction::open_inverted_index_file_reader", {
res = ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"debug point: Compaction::open_index_file_reader error"));
})
if (!res.has_value()) {
throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg());
}
src_idx_dirs[src_segment_id] = std::move(res.value());
}
for (int dest_segment_id = 0; dest_segment_id < dest_segment_num; dest_segment_id++) {
auto dest_dir =
DORIS_TRY(inverted_index_file_writers[dest_segment_id]->open(index_meta));
auto res = inverted_index_file_writers[dest_segment_id]->open(index_meta);
DBUG_EXECUTE_IF("Compaction::open_inverted_index_file_writer", {
res = ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
"debug point: Compaction::open_inverted_index_file_writer error"));
})
if (!res.has_value()) {
throw Exception(ErrorCode::INVERTED_INDEX_COMPACTION_ERROR, res.error().msg());
}
// Destination directories in dest_index_dirs do not need to be deconstructed,
// but their lifecycle must be managed by inverted_index_file_writers.
dest_index_dirs[dest_segment_id] = dest_dir.get();
dest_index_dirs[dest_segment_id] = res.value().get();
}
auto st = compact_column(index_meta->index_id(), src_idx_dirs, dest_index_dirs,
index_tmp_path.native(), trans_vec, dest_segment_num_rows);
Expand All @@ -681,6 +694,9 @@ Status Compaction::do_inverted_index_compaction() {
} catch (CLuceneError& e) {
error_handler(index_meta->index_id(), column_uniq_id);
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what());
} catch (const Exception& e) {
error_handler(index_meta->index_id(), column_uniq_id);
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(e.what());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

import org.codehaus.groovy.runtime.IOGroovyMethods

suite("test_skip_index_compaction_fault_injection", "nonConcurrent") {
def isCloudMode = isCloudMode()
def tableName1 = "test_skip_index_compaction_fault_injection_1"
def tableName2 = "test_skip_index_compaction_fault_injection_2"
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

sql "DROP TABLE IF EXISTS ${tableName1}"
sql """
CREATE TABLE ${tableName1} (
`@timestamp` int(11) NULL COMMENT "",
`clientip` varchar(20) NULL COMMENT "",
`request` text NULL COMMENT "",
`status` int(11) NULL COMMENT "",
`size` int(11) NULL COMMENT "",
INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`@timestamp`)
COMMENT "OLAP"
DISTRIBUTED BY RANDOM BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "true",
"inverted_index_storage_format" = "V1"
);
"""

sql "DROP TABLE IF EXISTS ${tableName2}"
sql """
CREATE TABLE ${tableName2} (
`@timestamp` int(11) NULL COMMENT "",
`clientip` varchar(20) NULL COMMENT "",
`request` text NULL COMMENT "",
`status` int(11) NULL COMMENT "",
`size` int(11) NULL COMMENT "",
INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = "english", "support_phrase" = "true") COMMENT ''
) ENGINE=OLAP
DUPLICATE KEY(`@timestamp`)
COMMENT "OLAP"
DISTRIBUTED BY RANDOM BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "true",
"inverted_index_storage_format" = "V2"
);
"""

boolean disableAutoCompaction = false

def set_be_config = { key, value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
}

def trigger_full_compaction_on_tablets = { tablets ->
for (def tablet : tablets) {
String tablet_id = tablet.TabletId
String backend_id = tablet.BackendId
int times = 1

String compactionStatus;
do{
def (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
compactionStatus = parseJson(out.trim()).status.toLowerCase();
} while (compactionStatus!="success" && times<=10 && compactionStatus!="e-6010")


if (compactionStatus == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction && compactionStatus!="e-6010") {
assertEquals("success", compactionStatus)
}
}
}

def wait_full_compaction_done = { tablets ->
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
String backend_id = tablet.BackendId
def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
}

def get_rowset_count = { tablets ->
int rowsetCount = 0
for (def tablet in tablets) {
def (code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
rowsetCount +=((List<String>) tabletJson.rowsets).size()
}
return rowsetCount
}

def check_config = { String key, String value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == key) {
assertEquals(value, ((List<String>) ele)[2])
}
}
}
}

def run_test = { tableName ->
sql """ INSERT INTO ${tableName} VALUES (1, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """
sql """ INSERT INTO ${tableName} VALUES (2, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """
sql """ INSERT INTO ${tableName} VALUES (3, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """
sql """ INSERT INTO ${tableName} VALUES (4, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """
sql """ INSERT INTO ${tableName} VALUES (5, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """
sql """ INSERT INTO ${tableName} VALUES (6, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """
sql """ INSERT INTO ${tableName} VALUES (7, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """
sql """ INSERT INTO ${tableName} VALUES (8, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """
sql """ INSERT INTO ${tableName} VALUES (9, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """
sql """ INSERT INTO ${tableName} VALUES (10, "40.135.0.0", "GET /images/hm_bg.jpg HTTP/1.0", 1, 2); """

sql "sync"

def tablets = sql_return_maparray """ show tablets from ${tableName}; """
logger.info("tablets: {}", tablets)

int replicaNum = 1
def dedup_tablets = deduplicate_tablets(tablets)
if (dedup_tablets.size() > 0) {
replicaNum = Math.round(tablets.size() / dedup_tablets.size())
if (replicaNum != 1 && replicaNum != 3) {
assert(false)
}
}

int rowsetCount = get_rowset_count.call(tablets);
assert (rowsetCount == 11 * replicaNum)

// first
trigger_full_compaction_on_tablets.call(tablets)
wait_full_compaction_done.call(tablets)

int rowsetCount = get_rowset_count.call(tablets);
assert (rowsetCount == 11 * replicaNum)

// second
trigger_full_compaction_on_tablets.call(tablets)
wait_full_compaction_done.call(tablets)

rowsetCount = get_rowset_count.call(tablets);
if (isCloudMode) {
assert (rowsetCount == (1 + 1) * replicaNum)
} else {
assert (rowsetCount == 1 * replicaNum)
}
}

boolean invertedIndexCompactionEnable = false
boolean has_update_be_config = false
try {
String backend_id;
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))

logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "inverted_index_compaction_enable") {
invertedIndexCompactionEnable = Boolean.parseBoolean(((List<String>) ele)[2])
logger.info("inverted_index_compaction_enable: ${((List<String>) ele)[2]}")
}
}
set_be_config.call("inverted_index_compaction_enable", "true")
has_update_be_config = true
check_config.call("inverted_index_compaction_enable", "true");

try {
GetDebugPoint().enableDebugPointForAllBEs("Compaction::open_inverted_index_file_reader")
run_test.call(tableName1)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("Compaction::open_inverted_index_file_reader")
}

try {
GetDebugPoint().enableDebugPointForAllBEs("Compaction::open_inverted_index_file_writer")
run_test.call(tableName2)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("Compaction::open_inverted_index_file_writer")
}
} finally {
if (has_update_be_config) {
set_be_config.call("inverted_index_compaction_enable", invertedIndexCompactionEnable.toString())
}
}
}

0 comments on commit 64a3a1e

Please sign in to comment.