X
    Categories: yii2

yii2 自定义 cron 机制

此 cron 优点:

1,linux crontab 只需加入 ***** php yii init/cron

2,  cron 通过 config 文件配置,修改配置后需执行 php yii init/crontab

3,重试机制,任务出错后将延缓5分钟执行,直到达到最大尝试次数后,标识为失败

4,兼容周期性任务、一次性任务

5,多线程执行任务,任务不阻塞

目录结构

 

/
|-models
    |--cron
        |---Model.php
        |---Lib.php
|-commands
    |--InitController.php
|-config
    |--crontab.php

 

\models\cron\Model.php

<?php

namespace app\models\cron;

use yii\behaviors\TimestampBehavior;
use yii\db\ActiveRecord;
if(!defined('NOW_TIME')){
    define('NOW_TIME',time());
}
class Model extends ActiveRecord
{
    const CRON_STATUS_DEF=0;
    const CRON_STATUS_RUN=1;
    const CRON_STATUS_RUN_FIL=-1;
    const CRON_STATUS_RUN_OK=2;

    const CRON_RETRY_COUNT=50;//断定不可访问
    const CRON_RETRY_TIME=300;//重试时间
    const CRON_RENEW_TIMEOUT=3600;//新建任务时间周期
    const CRON_RENEW_TIMEOUT_KEY='CRON_RENEW_TIMEOUT';

    const CRON_TIMEOUT=86400;//不再使用retry进行判断,每次执行错误,对cron降低优先级(priority)

    const CRON_METHOD_GET=0;
    const CRON_METHOD_POST=1;


    /**
     * @inheritdoc
     */
    public static function tableName()
    {
        return '{{%cron}}';
    }
    /**
     * @inheritdoc
     */
    public function behaviors()
    {
        return [
            TimestampBehavior::className(),
        ];
    }

    /**
     * @inheritdoc
     */
    public function rules()
    {
        return [
            [['path','hash',], 'required'],
        ];
    }
    static function create($path, $params=array(),$crontime=0,$priority=0){
        $CRON_RENEW_TIMEOUT=Model::CRON_RENEW_TIMEOUT;
        if(isset($params['CRON_RENEW_TIMEOUT'])){
            $CRON_RENEW_TIMEOUT=$params['CRON_RENEW_TIMEOUT'];
            unset($params['CRON_RENEW_TIMEOUT']);
        }
        $param=json_encode($params);
        $hash=md5($path.$param.($crontime>0?$crontime:""));
        $cron_id="";
        if($crontime>0){
            $cron=Model::find()->where("`hash`='{$hash}'")->one();
            if($cron){
                $cron_id=$cron->getAttribute('id');
            }
        }else{
            //$cron_id=$cron->where("`hash`='{$hash}' and addtime>".(NOW_TIME-$CRON_RENEW_TIMEOUT))->getField('id');
            $ex_cron=Model::find()
                ->where("`hash`='{$hash}'")
                ->select('id,created_at,status')
                ->orderBy('created_at desc')
                ->one();
            if($ex_cron){
                $cron_id=$ex_cron['id'];

                if($ex_cron['status']==Model::CRON_STATUS_RUN_OK || self::CRON_STATUS_RUN_FIL){
                    //可以新建任务
                    $cron_id="";
                }elseif(NOW_TIME-$ex_cron['created_at']>Model::CRON_RENEW_TIMEOUT){
                    if ($ex_cron['status']!=Model::CRON_STATUS_RUN_FIL){
                        //修复任务
                        Model::updateAll([
                            'retry'=>0,
                            'priority'=>3,
                        ],['id'=>$ex_cron['id']]);
                    }
                }
            }
        }
        if($cron_id) return;
        $cron=new Model();
        $cron->hash=$hash;
        $cron->path=$path;
        $cron->param=$param;
        $cron->method=Model::CRON_METHOD_POST;
        $cron->crontime=$crontime+0;
        $cron->priority=$priority+0;
        $cron->save();
    }
    static function result($cron_id,$result=true,$message=""){
        if(!$cron_id) return;
        $cron=Model::findOne($cron_id);
        if(!$cron->id) return;
        $cron->updated_at=NOW_TIME;
        $cron->retry+=1;
        if($result){
            $cron->status=self::CRON_STATUS_RUN_OK;
        }elseif($cron->retry>=self::CRON_RETRY_COUNT){
            $cron->status=self::CRON_STATUS_RUN_FIL;
        }
        if(!$result){
            $cron->priority-=1;
            if('api/cron.syncCommodities'==$cron->path){//顺延到同步商品任务最后
                $lastcron=Model::find()->where(['path'=>$cron->path])->orderBy('crontime desc')->one();
                $cron->crontime=($lastcron->crontime?$lastcron->crontime:time())+60;
            }
        }
        $cron->message=substr($message,0,250);
        $cron->save();
    }
    static function crontab(){
        $crontab=\Yii::$app->params['crontab'];
        foreach ($crontab as $cron){
            list($path,$param,$crontime,$priority)=$cron;
            if(strstr($crontime,':')){
                $crontime=getDayTime($crontime);
            }else{
                $crontime=0;
            }
            if(is_array($param)){
                $param_str=json_encode($param);
            }else{
                $param_str='[]';
            }
            $one=Model::find()
                ->where("status=".self::CRON_STATUS_DEF." and path='{$path}' and param='{$param_str}'")
                ->one();
            if($one){
                //update
                $one->priority=$priority;
                $one->crontime=$crontime;
                $one->save();
                echo "update cron ".json_encode($cron)."\n";
            }else{
                self::create($path,$param,$crontime,$priority);
                echo "new cron ".json_encode($cron)."\n";
            }
        }
    }
    //添加一条周期时间的任务
    static function crontabNext($croning){
        $crontab=\Yii::$app->params['crontab'];
        foreach ($crontab as $cron){
            list($path,$param,$crontime,$priority)=$cron;
            if(is_array($param)){
                $param_str=json_encode($param);
            }else{
                $param_str='[]';
            }
            if($path==$croning['path'] && $param_str==$croning['param']){
                if(strstr($crontime,':')){
                    $crontime=getDayTime($crontime);
                }else{
                    $crontime=NOW_TIME+$crontime;
                }
                self::create($path,$param,$crontime,$priority);
                return;
            }
        }
    }
}

\models\cron\Lib.php

<?php
namespace app\models\cron;

use GuzzleHttp\Pool;
use GuzzleHttp\Client;
use GuzzleHttp\Psr7\Request;
use yii\helpers\Url;
if(!defined('NOW_TIME')){
    define('NOW_TIME',time());
}
class Lib{
    private $data,$base_url;
    private $totalPageCount;
    private $counter        = 1;
    private $concurrency    = 10;  // 同时并发抓取
    function implement(){
        $this->data=Model::find()
            ->andWhere([
                'and',
                ['=','status',Model::CRON_STATUS_DEF],
                ['<','crontime',NOW_TIME],
            ])
            ->andWhere([
                'or',
                ['<','updated_at',NOW_TIME-Model::CRON_RETRY_TIME],
                'updated_at=created_at',
            ])
            ->orderBy('`priority` desc,`created_at` asc')
            ->limit(30)
            //->createCommand()->getRawSql();
            ->asArray()->all();
        if(!$this->data) return;
        $this->totalPageCount=count($this->data);
        $this->base_url="http://127.0.0.1".(YII_DEBUG?'/ERP-V3/web/':'/');
        $client = new Client(['base_uri' => $this->base_url]);
        $requests = function ($total) use ($client) {
            foreach ($this->data as $key => $cron) {
                $uri = $cron['path'];
                $param=$cron['param']?json_decode($cron['param'],true):[];
                yield function() use ($client, $uri,$param) {
                    return $client->postAsync($uri,['form_params'=>$param]);
                };
            }
        };
        
        $pool = new Pool($client, $requests($this->totalPageCount), [
            'concurrency' => $this->concurrency,
            'fulfilled'   => function ($response, $index){
                $message=$response->getBody()->getContents();
                $res = @json_decode($message);
                if(isset($res->code) && $res->code==200){
                    Model::result($this->data[$index]['id'],true,$message);
                    Model::crontabNext($this->data[$index]);
                }else{
                    $message=preg_replace("/<[^>]+>/","",$message);
                    Model::result($this->data[$index]['id'],false,$message);
                }
                $this->implementMessage($index,$message,isset($res->code)?$res->code:500);

            },
            'rejected' => function ($reason, $index){
                list($message[],$message[],$message[])=explode("\n",$reason);
                $message=preg_replace("/<[^>]+>/","",implode("\n",$message));
                Model::result($this->data[$index]['id'],false,$message);
                $this->implementMessage($index,$message);
            },
        ]);

        // 开始发送请求
        $promise = $pool->promise();
        $promise->wait();
    }
    function implementMessage($index,$message,$code=500){
        echo "[{$code}]\t$index:$message\n=============================\n";
    }
}

\commands\InitController.php

<?php
namespace app\commands;
use app\models\cron\Lib as cronLib;
use app\models\cron\Model;
use app\models\system\Role;
use app\models\User ;

class InitController extends \yii\console\Controller
{
    public function actionCron(){
        $cronLib=new cronLib();
        $cronLib->implement();
    }
    public function actionCrontab(){
        Model::crontab();
    }
}

\config\crontab.php

<?php
/**
 * 周期性任务
 *      
 * 添加完成后
 *      执行 yii init/crontab
 */
$crontab=[
    //['路径',['参数名'=>'参数值'],'周期时间','优先级'],
    //['api/cron.flushMyAct',['params'=>'params'],"03:00:00",0], //每天03:00:00执行
    //['api/cron.flushMyActx',['params'=>'params'],3600,0], //每小时执行
    //['api/login',['username'=>'admin','password'=>'123456',],300,0], //每五分钟执行
    //以上为参考格式,请勿删除,任务添加到下方
    ['api/cron.flushOrderList',[],300,0], //每五分钟执行
    ['api/cron.syncFeeds',[],86400*7,0], //每周执行
    ['api/cron.matchTrackNo',[],600,0], //每10分钟执行
];

return ['crontab'=>$crontab];

表结构

/*表: cron*/-------------

/*列信息*/-----------

Field       Type          Collation        Null    Key     Default  Extra            
----------  ------------  ---------------  ------  ------  -------  --------------  
id          int(11)       (NULL)           NO      PRI     (NULL)   auto_increment             
path        varchar(100)  utf8_general_ci  NO              (NULL)                              
param       text          utf8_general_ci  YES             (NULL)                              
method      tinyint(1)    (NULL)           YES             1                                   
header      varchar(250)  utf8_general_ci  YES             (NULL)                              
status      smallint(1)   (NULL)           YES             0                                   
retry       smallint(1)   (NULL)           YES             0                                   
created_at  int(11)       (NULL)           NO              0                                   
updated_at  int(11)       (NULL)           NO              0                                  
message     varchar(250)  utf8_general_ci  YES             (NULL)                             
crontime    int(11)       (NULL)           YES             (NULL)                              
hash        char(32)      utf8_general_ci  YES             (NULL)                              
priority    smallint(1)   (NULL)           YES             0                                   

/*索引信息*/--------------

Table   Non_unique  Key_name  Seq_in_index  Column_name   
------  ----------  --------  ------------  -----------  
cron             0  PRIMARY              1  id                              

/*DDL 信息*/------------

CREATE TABLE `cron` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `path` varchar(100) NOT NULL,
  `param` text,
  `method` tinyint(1) DEFAULT '1',
  `header` varchar(250) DEFAULT NULL,
  `status` smallint(1) DEFAULT '0',
  `retry` smallint(1) DEFAULT '0',
  `created_at` int(11) NOT NULL DEFAULT '0',
  `updated_at` int(11) NOT NULL DEFAULT '0',
  `message` varchar(250) DEFAULT NULL,
  `crontime` int(11) DEFAULT NULL,
  `hash` char(32) DEFAULT NULL,
  `priority` smallint(1) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8

 

打赏
微信扫一扫,打赏作者吧~
admin :