Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](suites) Fix syncer ingest binlog with multiple replicas #44444

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,13 @@ class Syncer {
// step 3.2: get partition/indexId/tabletId
partitionSQl += "/" + meta.indexId.toString()
sqlInfo = sendSql.call(partitionSQl, toSrc)
Map<Long, Long> replicaMap = Maps.newHashMap()
for (List<Object> row : sqlInfo) {
meta.tabletMeta.put(row[0] as Long, row[2] as Long)
Long tabletId = row[0] as Long
if (!meta.tabletMeta.containsKey(tabletId)) {
meta.tabletMeta.put(tabletId, new TabletMeta())
}
meta.tabletMeta[tabletId].replicas.put(row[1] as Long, row[2] as Long)
}
if (meta.tabletMeta.isEmpty()) {
logger.error("Target cluster get (partitionId/indexId)-(${info.key}/${meta.indexId}) tabletIds fault.")
Expand Down Expand Up @@ -816,49 +821,57 @@ class Syncer {
while (srcTabletIter.hasNext()) {
Entry srcTabletMap = srcTabletIter.next()
Entry tarTabletMap = tarTabletIter.next()
TabletMeta srcTabletMeta = srcTabletMap.value
TabletMeta tarTabletMeta = tarTabletMap.value

Iterator srcReplicaIter = srcTabletMeta.replicas.iterator()
Iterator tarReplicaIter = tarTabletMeta.replicas.iterator()
while (srcReplicaIter.hasNext()) {
Entry srcReplicaMap = srcReplicaIter.next()
Entry tarReplicaMap = tarReplicaIter.next()
BackendClientImpl srcClient = context.sourceBackendClients.get(srcReplicaMap.value)
if (srcClient == null) {
logger.error("Can't find src tabletId-${srcReplicaMap.key} -> beId-${srcReplicaMap.value}")
return false
}
BackendClientImpl tarClient = context.targetBackendClients.get(tarReplicaMap.value)
if (tarClient == null) {
logger.error("Can't find target tabletId-${tarReplicaMap.key} -> beId-${tarReplicaMap.value}")
return false
}

BackendClientImpl srcClient = context.sourceBackendClients.get(srcTabletMap.value)
if (srcClient == null) {
logger.error("Can't find src tabletId-${srcTabletMap.key} -> beId-${srcTabletMap.value}")
return false
}
BackendClientImpl tarClient = context.targetBackendClients.get(tarTabletMap.value)
if (tarClient == null) {
logger.error("Can't find target tabletId-${tarTabletMap.key} -> beId-${tarTabletMap.value}")
return false
}

tarPartition.value.version = srcPartition.value.version
long partitionId = fakePartitionId == -1 ? tarPartition.key : fakePartitionId
long version = fakeVersion == -1 ? partitionRecord.version : fakeVersion

TIngestBinlogRequest request = new TIngestBinlogRequest()
TUniqueId uid = new TUniqueId(-1, -1)
request.setTxnId(txnId)
request.setRemoteTabletId(srcTabletMap.key)
request.setBinlogVersion(version)
request.setRemoteHost(srcClient.address.hostname)
request.setRemotePort(srcClient.httpPort.toString())
request.setPartitionId(partitionId)
request.setLocalTabletId(tarTabletMap.key)
request.setLoadId(uid)
logger.info("request -> ${request}")
TIngestBinlogResult result = tarClient.client.ingestBinlog(request)
if (!checkIngestBinlog(result)) {
logger.error("Ingest binlog error! result: ${result}")
return false
}
tarPartition.value.version = srcPartition.value.version
long partitionId = fakePartitionId == -1 ? tarPartition.key : fakePartitionId
long version = fakeVersion == -1 ? partitionRecord.version : fakeVersion

TIngestBinlogRequest request = new TIngestBinlogRequest()
TUniqueId uid = new TUniqueId(-1, -1)
request.setTxnId(txnId)
request.setRemoteTabletId(srcTabletMap.key)
request.setBinlogVersion(version)
request.setRemoteHost(srcClient.address.hostname)
request.setRemotePort(srcClient.httpPort.toString())
request.setPartitionId(partitionId)
request.setLocalTabletId(tarTabletMap.key)
request.setLoadId(uid)
logger.info("request -> ${request}")
TIngestBinlogResult result = tarClient.client.ingestBinlog(request)
if (!checkIngestBinlog(result)) {
logger.error("Ingest binlog error! result: ${result}")
return false
}

if (context.txnInsert) {
List<TTabletCommitInfo> tabletCommitInfos = subTxnIdToTabletCommitInfos.get(txnId)
if (tabletCommitInfos == null) {
tabletCommitInfos = new ArrayList<TTabletCommitInfo>()
subTxnIdToTabletCommitInfos.put(txnId, tabletCommitInfos)
subTxnIdToTableId.put(txnId, tarTableMeta.id)
if (context.txnInsert) {
List<TTabletCommitInfo> tabletCommitInfos = subTxnIdToTabletCommitInfos.get(txnId)
if (tabletCommitInfos == null) {
tabletCommitInfos = new ArrayList<TTabletCommitInfo>()
subTxnIdToTabletCommitInfos.put(txnId, tabletCommitInfos)
subTxnIdToTableId.put(txnId, tarTableMeta.id)
}
tabletCommitInfos.add(new TTabletCommitInfo(tarTabletMap.key, tarReplicaMap.value))
} else {
addCommitInfo(tarTabletMap.key, tarReplicaMap.value)
}
tabletCommitInfos.add(new TTabletCommitInfo(tarTabletMap.key, tarTabletMap.value))
} else {
addCommitInfo(tarTabletMap.key, tarTabletMap.value)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,22 @@ import groovy.util.logging.Slf4j

import java.sql.Connection

class TabletMeta {
public TreeMap<Long, Long> replicas

TabletMeta() {
this.replicas = new TreeMap<Long, Long>()
}

String toString() {
return "TabletMeta: { replicas: " + replicas.toString() + " }"
}
}

class PartitionMeta {
public long version
public long indexId
public TreeMap<Long, Long> tabletMeta
public TreeMap<Long, TabletMeta> tabletMeta

PartitionMeta(long indexId, long version) {
this.indexId = indexId
Expand Down Expand Up @@ -219,6 +231,19 @@ class SyncerContext {
} else if (srcTabletMeta.size() != tarTabletMeta.size()) {
return false
}

Iterator srcTabletIter = srcTabletMeta.iterator()
Iterator tarTabletIter = tarTabletMeta.iterator()
while (srcTabletIter.hasNext()) {
Map srcReplicaMap = srcTabletIter.next().value.replicas
Map tarReplicaMap = tarTabletIter.next().value.replicas

if (srcReplicaMap.isEmpty() || tarReplicaMap.isEmpty()) {
return false
} else if (srcReplicaMap.size() != tarReplicaMap.size()) {
return false
}
}
}
})

Expand Down
36 changes: 18 additions & 18 deletions regression-test/suites/ccr_mow_syncer_p0/test_ingest_binlog.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,36 @@ suite("test_mow_ingest_binlog") {
def test_num = 0
sql "DROP TABLE IF EXISTS ${tableName}"
sql """
CREATE TABLE if NOT EXISTS ${tableName}
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"replication_allocation" = "tag.location.default: 1"
)
"""
sql """ALTER TABLE ${tableName} set ("binlog.enable" = "true")"""

target_sql "DROP TABLE IF EXISTS ${tableName}"
target_sql """
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"replication_allocation" = "tag.location.default: 1"
)
"""
CREATE TABLE if NOT EXISTS ${tableName}
(
`test` INT,
`id` INT
)
ENGINE=OLAP
UNIQUE KEY(`test`, `id`)
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"replication_allocation" = "tag.location.default: 1"
)
"""
assertTrue(syncer.getTargetMeta("${tableName}"))


Expand Down Expand Up @@ -124,4 +124,4 @@ suite("test_mow_ingest_binlog") {

// End Test 2
syncer.closeBackendClients()
}
}
Loading