首页> 实战笔录 >PHP开发笔记 >Hyperf Hyperf
Hyperf3.1使用异步多携程实现大批量数据按分页转移或插入
作者:小萝卜 2025-07-05 【 Hyperf 异步 携程 】 浏览 11
简介目前有20W条会员信息,我需要将这批会员的uid和另外一些参数批量插入到另外一张表中
/**
* @desc 指定会员转移
* @param array $params
* @author Luobo
*/
#[AsyncQueueMessage(pool: 'fast')]
public function pushAgreementByMemberId(string $tempId,array $members)
{
logger()->info('转移会员',['tempId'=>$tempId,'members'=>$members]);
try {
//构造数据
$data = $this->setPushData($tempId,$members);
//直接插入数据
if(!empty($data)){
$this->repository::getModel()::insert($data);
}
}catch (\Throwable $e){
logger()->error('按指定会员转移失败',['error'=>$e->getMessage(),'tempId'=>$tempId,'members'=>$members]);
}
}
/**
* @desc 按条件转移
* @param array $params
* @author Luobo
*/
#[AsyncQueueMessage(pool: 'fast')]
public function pushAgreement(string $tempId,array $params)
{
$pageSize = 1000;
$concurrency = 5;
//统计会员总数
$totalMembers = TMember::instance()->count($params);
if($totalMembers < 1) return false;
//计算总页数
$totalPages = ceil($totalMembers / $pageSize);
//创建携程并发任务
$parallel = new Parallel($concurrency);
// 生成分页任务数组 [1, 2, ..., 100]
$pages = range(1, $totalPages);
foreach ($pages as $page) {
$parallel->add(function () use ($params, $page ,$pageSize, $tempId) {
$members = TMember::instance()->getMemberIds($params,$page,$pageSize);
$this->pushAgreementByMemberId($tempId,$members);
});
}
$parallel->wait();
}
/**
* @param string $tempId
* @param array $members
* @return array
* @author Luobo
* @desc 去除重复数据-构造插入数据
*/
private function setPushData(string $tempId,array $members):array
{
if(empty($members) || empty($tempId)) return [];
$duplicate = $this->repository::getModel()::where('temp_id',$tempId)->whereIn('member_id',$members)->pluck('member_id')->toArray();
//排除重复的数据
$members = collect($members)->diff($duplicate)->values()->toArray();
$pushData = [];
foreach ($members as $member){
$pushData[] = [
'uuid' => uuid(),
'temp_id' => $tempId,
'title' => 'pamm copy LPOA',
'model_id' => 0,
'member_id'=> $member,
'doc_type' => '2',
'type' => 1,
'status' =>1,
'create_status' => 1,
'create_time' => time(),
'update_time' => time(),
];
}
return $pushData;
}
很赞哦! (0)