Skip to content

Commit

Permalink
v1.14.0 FIX/New
Browse files Browse the repository at this point in the history
- 修复db.Chunk功能在limit为1时只能取到一条数据,当limit为多时又会一次性取出多条数据而不是一条一条获取一条一条处理的BUG
- 新增db.ChunkWG多线程大量数据读取处理方法
  • Loading branch information
tobycroft committed Feb 18, 2024
1 parent c351882 commit dd25d0f
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 31 deletions.
93 changes: 62 additions & 31 deletions orm_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,6 @@ func (dba *Orm) _valueFromStruct(bindResult reflect.Value, field string) (v inte

// Chunk : 分块处理数据,当要处理很多数据的时候, 我不需要知道具体是多少数据, 我只需要每次取limit条数据,
// 然后不断的增加offset去取更多数据, 从而达到分块处理更多数据的目的
// TODO 后续增加 gorotine 支持, 提高批量数据处理效率, 预计需要增加获取更多链接的支持
func (dba *Orm) Chunk(limit int, callback func([]Data) error) (err error) {
var page = 1
var tabname = dba.GetISession().GetIBinder().GetBindName()
Expand Down Expand Up @@ -419,36 +418,68 @@ func (dba *Orm) Chunk(limit int, callback func([]Data) error) (err error) {
return
}

// ChunkWG : ChunkWG是在Chunk的基础上,新增带有多线程性质的处理方式,理论性能将会有30%左右的提升,需要处理的数据量越大越多,比原版Chunk的提升就越明显
// 使用ChunkWG处理大量数据,请尽量保证有足够多的连接数,避免影响程序其他需要数据库的部分连接进入等待状态
func (dba *Orm) ChunkWG(limit int, callback func([]Data) error) (err error) {
//var page = 1
//var tabname = dba.GetISession().GetIBinder().GetBindName()
//prefix := dba.GetISession().GetIBinder().GetBindPrefix()
//tabname2 := strings.TrimPrefix(tabname, prefix)
//where, fields, group := dba.where, dba.fields, dba.group
//
//// 先执行一条看看是否报错, 同时设置指定的limit, offset
//dba.Table(tabname2).Limit(limit).Page(page)
//if err = dba.Select(); err != nil {
// return
//}
//result := dba.GetBindAll()
//for len(result) > 0 {
// if err = callback(result); err != nil {
// break
// }
// page++
// // 清理绑定数据, 进行下一次操作, 因为绑定数据是每一次执行的时候都会解析并保存的
// // 而第二次以后执行的, 都会再次解析并保存, 数据结构是slice, 故会累积起来
// dba.ClearBindValues()
// dba.where, dba.fields, dba.group = where, fields, group
// dba.Table(tabname2).Limit(limit).Page(page)
// if err = dba.Select(); err != nil {
// break
// }
// result = dba.GetBindAll()
//}
// ChunkWG : ChunkWG是保留Chunk的使用方法的基础上,新增多线程读取&多线程执行的方式,注意onetime_exec_thread不宜过多,推荐4,不宜过大因为采用的是盲读的方法,详情请参考github-wiki的介绍部分
// 原理与PaginatorWG方法类似,保留了事务隔离性并且加入了
func (dba *Orm) ChunkWG(onetime_exec_thread int, limit int, callback func([]Data) error) (err error) {
if onetime_exec_thread <= 0 {
onetime_exec_thread = 1
}
if onetime_exec_thread > 20 {
onetime_exec_thread = 20
}
var page = 1
tabname := dba.GetISession().GetIBinder().GetBindName()
prefix := dba.GetISession().GetIBinder().GetBindPrefix()
tabname2 := strings.TrimPrefix(tabname, prefix)
where, fields, group := dba.where, dba.fields, dba.group
dba.Table(tabname2).Limit(limit).Page(page)
if err = dba.Select(); err != nil {
return
}
result := dba.GetBindAll()
if len(result) < 1 {
return
}
continue_run := true
for continue_run {
var mp map[string][]interface{}
for i := 0; i < onetime_exec_thread; i++ {
page++
dba.ClearBindValues()
dba.where, dba.fields, dba.group = where, fields, group
dba.Table(tabname2).Limit(limit).Page(page)
sqlStr, args, err := dba.BuildSql()
if err != nil {
continue_run = false
return err
}
mp[sqlStr] = args
}
var wg sync.WaitGroup
wg.Add(len(mp))
for sqlStr, args := range mp {
go func(sql string, arg []interface{}) {
_result, _err := dba.Query(sql, arg...)
if _err != nil {
wg.Done()
continue_run = false
logger.Error(_err.Error())
return
}
if len(_result) < 1 {
wg.Done()
continue_run = false
return
}
if _err = callback(_result); _err != nil {
wg.Done()
continue_run = false
return
}
}(sqlStr, args)
}
wg.Wait()
}
return
}

Expand Down
3 changes: 3 additions & 0 deletions orm_query_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type IOrmQuery interface {
// 例如,我们可以将处理全部 users 表数据分割成一次处理 100 条记录的小组块
// 你可以通过从闭包函数中返回 err 来终止组块的运行
Chunk(limit int, callback func([]Data) error) (err error)

// ChunkWG : ChunkWG是保留Chunk的使用方法的基础上,新增多线程读取&多线程执行的方式,注意onetime_exec_thread不宜过多,推荐4,不宜过大因为采用的是盲读的方法,详情请参考github-wiki的介绍部分
ChunkWG(onetime_exec_thread int, limit int, callback func([]Data) error) (err error)
// 跟Chunk类似,只不过callback的是传入的结构体
ChunkStruct(limit int, callback func() error) (err error)
Loop(limit int, callback func([]Data) error) (err error)
Expand Down

0 comments on commit dd25d0f

Please sign in to comment.