此 cron 优点:
1,linux crontab 只需加入 ***** php yii init/cron
2, cron 通过 config 文件配置,修改配置后需执行 php yii init/crontab
3,重试机制,任务出错后将延缓5分钟执行,直到达到最大尝试次数后,标识为失败
4,兼容周期性任务、一次性任务
5,多线程执行任务,任务不阻塞
目录结构
1 2 3 4 5 6 7 8 9 |
/ |-models |--cron |---Model.php |---Lib.php |-commands |--InitController.php |-config |--crontab.php |
\models\cron\Model.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
<?php namespace app\models\cron; use yii\behaviors\TimestampBehavior; use yii\db\ActiveRecord; if(!defined('NOW_TIME')){ define('NOW_TIME',time()); } class Model extends ActiveRecord { const CRON_STATUS_DEF=0; const CRON_STATUS_RUN=1; const CRON_STATUS_RUN_FIL=-1; const CRON_STATUS_RUN_OK=2; const CRON_RETRY_COUNT=50;//断定不可访问 const CRON_RETRY_TIME=300;//重试时间 const CRON_RENEW_TIMEOUT=3600;//新建任务时间周期 const CRON_RENEW_TIMEOUT_KEY='CRON_RENEW_TIMEOUT'; const CRON_TIMEOUT=86400;//不再使用retry进行判断,每次执行错误,对cron降低优先级(priority) const CRON_METHOD_GET=0; const CRON_METHOD_POST=1; /** * @inheritdoc */ public static function tableName() { return '{{%cron}}'; } /** * @inheritdoc */ public function behaviors() { return [ TimestampBehavior::className(), ]; } /** * @inheritdoc */ public function rules() { return [ [['path','hash',], 'required'], ]; } static function create($path, $params=array(),$crontime=0,$priority=0){ $CRON_RENEW_TIMEOUT=Model::CRON_RENEW_TIMEOUT; if(isset($params['CRON_RENEW_TIMEOUT'])){ $CRON_RENEW_TIMEOUT=$params['CRON_RENEW_TIMEOUT']; unset($params['CRON_RENEW_TIMEOUT']); } $param=json_encode($params); $hash=md5($path.$param.($crontime>0?$crontime:"")); $cron_id=""; if($crontime>0){ $cron=Model::find()->where("`hash`='{$hash}'")->one(); if($cron){ $cron_id=$cron->getAttribute('id'); } }else{ //$cron_id=$cron->where("`hash`='{$hash}' and addtime>".(NOW_TIME-$CRON_RENEW_TIMEOUT))->getField('id'); $ex_cron=Model::find() ->where("`hash`='{$hash}'") ->select('id,created_at,status') ->orderBy('created_at desc') ->one(); if($ex_cron){ $cron_id=$ex_cron['id']; if($ex_cron['status']==Model::CRON_STATUS_RUN_OK || self::CRON_STATUS_RUN_FIL){ //可以新建任务 $cron_id=""; }elseif(NOW_TIME-$ex_cron['created_at']>Model::CRON_RENEW_TIMEOUT){ if ($ex_cron['status']!=Model::CRON_STATUS_RUN_FIL){ //修复任务 Model::updateAll([ 'retry'=>0, 'priority'=>3, ],['id'=>$ex_cron['id']]); } } } } if($cron_id) return; $cron=new Model(); $cron->hash=$hash; $cron->path=$path; $cron->param=$param; $cron->method=Model::CRON_METHOD_POST; $cron->crontime=$crontime+0; $cron->priority=$priority+0; $cron->save(); } static function result($cron_id,$result=true,$message=""){ if(!$cron_id) return; $cron=Model::findOne($cron_id); if(!$cron->id) return; $cron->updated_at=NOW_TIME; $cron->retry+=1; if($result){ $cron->status=self::CRON_STATUS_RUN_OK; }elseif($cron->retry>=self::CRON_RETRY_COUNT){ $cron->status=self::CRON_STATUS_RUN_FIL; } if(!$result){ $cron->priority-=1; if('api/cron.syncCommodities'==$cron->path){//顺延到同步商品任务最后 $lastcron=Model::find()->where(['path'=>$cron->path])->orderBy('crontime desc')->one(); $cron->crontime=($lastcron->crontime?$lastcron->crontime:time())+60; } } $cron->message=substr($message,0,250); $cron->save(); } static function crontab(){ $crontab=\Yii::$app->params['crontab']; foreach ($crontab as $cron){ list($path,$param,$crontime,$priority)=$cron; if(strstr($crontime,':')){ $crontime=getDayTime($crontime); }else{ $crontime=0; } if(is_array($param)){ $param_str=json_encode($param); }else{ $param_str='[]'; } $one=Model::find() ->where("status=".self::CRON_STATUS_DEF." and path='{$path}' and param='{$param_str}'") ->one(); if($one){ //update $one->priority=$priority; $one->crontime=$crontime; $one->save(); echo "update cron ".json_encode($cron)."\n"; }else{ self::create($path,$param,$crontime,$priority); echo "new cron ".json_encode($cron)."\n"; } } } //添加一条周期时间的任务 static function crontabNext($croning){ $crontab=\Yii::$app->params['crontab']; foreach ($crontab as $cron){ list($path,$param,$crontime,$priority)=$cron; if(is_array($param)){ $param_str=json_encode($param); }else{ $param_str='[]'; } if($path==$croning['path'] && $param_str==$croning['param']){ if(strstr($crontime,':')){ $crontime=getDayTime($crontime); }else{ $crontime=NOW_TIME+$crontime; } self::create($path,$param,$crontime,$priority); return; } } } } |
\models\cron\Lib.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
<?php namespace app\models\cron; use GuzzleHttp\Pool; use GuzzleHttp\Client; use GuzzleHttp\Psr7\Request; use yii\helpers\Url; if(!defined('NOW_TIME')){ define('NOW_TIME',time()); } class Lib{ private $data,$base_url; private $totalPageCount; private $counter = 1; private $concurrency = 10; // 同时并发抓取 function implement(){ $this->data=Model::find() ->andWhere([ 'and', ['=','status',Model::CRON_STATUS_DEF], ['<','crontime',NOW_TIME], ]) ->andWhere([ 'or', ['<','updated_at',NOW_TIME-Model::CRON_RETRY_TIME], 'updated_at=created_at', ]) ->orderBy('`priority` desc,`created_at` asc') ->limit(30) //->createCommand()->getRawSql(); ->asArray()->all(); if(!$this->data) return; $this->totalPageCount=count($this->data); $this->base_url="http://127.0.0.1".(YII_DEBUG?'/ERP-V3/web/':'/'); $client = new Client(['base_uri' => $this->base_url]); $requests = function ($total) use ($client) { foreach ($this->data as $key => $cron) { $uri = $cron['path']; $param=$cron['param']?json_decode($cron['param'],true):[]; yield function() use ($client, $uri,$param) { return $client->postAsync($uri,['form_params'=>$param]); }; } }; $pool = new Pool($client, $requests($this->totalPageCount), [ 'concurrency' => $this->concurrency, 'fulfilled' => function ($response, $index){ $message=$response->getBody()->getContents(); $res = @json_decode($message); if(isset($res->code) && $res->code==200){ Model::result($this->data[$index]['id'],true,$message); Model::crontabNext($this->data[$index]); }else{ $message=preg_replace("/<[^>]+>/","",$message); Model::result($this->data[$index]['id'],false,$message); } $this->implementMessage($index,$message,isset($res->code)?$res->code:500); }, 'rejected' => function ($reason, $index){ list($message[],$message[],$message[])=explode("\n",$reason); $message=preg_replace("/<[^>]+>/","",implode("\n",$message)); Model::result($this->data[$index]['id'],false,$message); $this->implementMessage($index,$message); }, ]); // 开始发送请求 $promise = $pool->promise(); $promise->wait(); } function implementMessage($index,$message,$code=500){ echo "[{$code}]\t$index:$message\n=============================\n"; } } |
\commands\InitController.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
<?php namespace app\commands; use app\models\cron\Lib as cronLib; use app\models\cron\Model; use app\models\system\Role; use app\models\User ; class InitController extends \yii\console\Controller { public function actionCron(){ $cronLib=new cronLib(); $cronLib->implement(); } public function actionCrontab(){ Model::crontab(); } } |
\config\crontab.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
<?php /** * 周期性任务 * * 添加完成后 * 执行 yii init/crontab */ $crontab=[ //['路径',['参数名'=>'参数值'],'周期时间','优先级'], //['api/cron.flushMyAct',['params'=>'params'],"03:00:00",0], //每天03:00:00执行 //['api/cron.flushMyActx',['params'=>'params'],3600,0], //每小时执行 //['api/login',['username'=>'admin','password'=>'123456',],300,0], //每五分钟执行 //以上为参考格式,请勿删除,任务添加到下方 ['api/cron.flushOrderList',[],300,0], //每五分钟执行 ['api/cron.syncFeeds',[],86400*7,0], //每周执行 ['api/cron.matchTrackNo',[],600,0], //每10分钟执行 ]; return ['crontab'=>$crontab]; |
表结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
/*表: cron*/------------- /*列信息*/----------- Field Type Collation Null Key Default Extra ---------- ------------ --------------- ------ ------ ------- -------------- id int(11) (NULL) NO PRI (NULL) auto_increment path varchar(100) utf8_general_ci NO (NULL) param text utf8_general_ci YES (NULL) method tinyint(1) (NULL) YES 1 header varchar(250) utf8_general_ci YES (NULL) status smallint(1) (NULL) YES 0 retry smallint(1) (NULL) YES 0 created_at int(11) (NULL) NO 0 updated_at int(11) (NULL) NO 0 message varchar(250) utf8_general_ci YES (NULL) crontime int(11) (NULL) YES (NULL) hash char(32) utf8_general_ci YES (NULL) priority smallint(1) (NULL) YES 0 /*索引信息*/-------------- Table Non_unique Key_name Seq_in_index Column_name ------ ---------- -------- ------------ ----------- cron 0 PRIMARY 1 id /*DDL 信息*/------------ CREATE TABLE `cron` ( `id` int(11) NOT NULL AUTO_INCREMENT, `path` varchar(100) NOT NULL, `param` text, `method` tinyint(1) DEFAULT '1', `header` varchar(250) DEFAULT NULL, `status` smallint(1) DEFAULT '0', `retry` smallint(1) DEFAULT '0', `created_at` int(11) NOT NULL DEFAULT '0', `updated_at` int(11) NOT NULL DEFAULT '0', `message` varchar(250) DEFAULT NULL, `crontime` int(11) DEFAULT NULL, `hash` char(32) DEFAULT NULL, `priority` smallint(1) DEFAULT '0', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 |