首页> 实战笔录 >PHP开发笔记 >Hyperf Hyperf

Hyperf3.1使用异步多携程实现大批量数据按分页转移或插入

作者:小萝卜 2025-07-05 浏览 11

简介目前有20W条会员信息,我需要将这批会员的uid和另外一些参数批量插入到另外一张表中

/**
     * @desc 指定会员转移
     * @param array $params
     * @author Luobo
     */
    #[AsyncQueueMessage(pool: 'fast')]
    public function pushAgreementByMemberId(string $tempId,array $members)
    {
        logger()->info('转移会员',['tempId'=>$tempId,'members'=>$members]);

        try {
            //构造数据
            $data = $this->setPushData($tempId,$members);
            //直接插入数据
            if(!empty($data)){
                $this->repository::getModel()::insert($data);
            }
        }catch (\Throwable $e){
            logger()->error('按指定会员转移失败',['error'=>$e->getMessage(),'tempId'=>$tempId,'members'=>$members]);
        }
    }

    /**
     * @desc 按条件转移
     * @param array $params
     * @author Luobo
     */
    #[AsyncQueueMessage(pool: 'fast')]
    public function pushAgreement(string $tempId,array $params)
    {
        $pageSize = 1000;
        $concurrency = 5;

        //统计会员总数
        $totalMembers = TMember::instance()->count($params);

        if($totalMembers < 1) return false;

        //计算总页数
        $totalPages = ceil($totalMembers / $pageSize);

        //创建携程并发任务
        $parallel = new Parallel($concurrency);

        // 生成分页任务数组 [1, 2, ..., 100]
        $pages = range(1, $totalPages);

        foreach ($pages as $page) {
            $parallel->add(function () use ($params, $page ,$pageSize, $tempId) {
                $members = TMember::instance()->getMemberIds($params,$page,$pageSize);
                $this->pushAgreementByMemberId($tempId,$members);
            });
        }

        $parallel->wait();
    }

    /**
     * @param string $tempId
     * @param array $members
     * @return array
     * @author Luobo
     * @desc 去除重复数据-构造插入数据
     */
    private function setPushData(string $tempId,array $members):array
    {
        if(empty($members) || empty($tempId)) return [];

        $duplicate = $this->repository::getModel()::where('temp_id',$tempId)->whereIn('member_id',$members)->pluck('member_id')->toArray();

        //排除重复的数据
        $members = collect($members)->diff($duplicate)->values()->toArray();

        $pushData = [];
        foreach ($members as $member){
            $pushData[] = [
                'uuid' => uuid(),
                'temp_id'  => $tempId,
                'title'  => 'pamm copy LPOA',
                'model_id' => 0,
                'member_id'=> $member,
                'doc_type' => '2',
                'type' => 1,
                'status'   =>1,
                'create_status' => 1,
                'create_time' => time(),
                'update_time' => time(),
            ];
        }

        return $pushData;
    }

很赞哦! (0)

文章评论

    高端网站建设