此 cron 优点:
1,linux crontab 只需加入 ***** php yii init/cron
2, cron 通过 config 文件配置,修改配置后需执行 php yii init/crontab
3,重试机制,任务出错后将延缓5分钟执行,直到达到最大尝试次数后,标识为失败
4,兼容周期性任务、一次性任务
5,多线程执行任务,任务不阻塞
目录结构
/ |-models |--cron |---Model.php |---Lib.php |-commands |--InitController.php |-config |--crontab.php
\models\cron\Model.php
<?php namespace app\models\cron; use yii\behaviors\TimestampBehavior; use yii\db\ActiveRecord; if(!defined('NOW_TIME')){ define('NOW_TIME',time()); } class Model extends ActiveRecord { const CRON_STATUS_DEF=0; const CRON_STATUS_RUN=1; const CRON_STATUS_RUN_FIL=-1; const CRON_STATUS_RUN_OK=2; const CRON_RETRY_COUNT=50;//断定不可访问 const CRON_RETRY_TIME=300;//重试时间 const CRON_RENEW_TIMEOUT=3600;//新建任务时间周期 const CRON_RENEW_TIMEOUT_KEY='CRON_RENEW_TIMEOUT'; const CRON_TIMEOUT=86400;//不再使用retry进行判断,每次执行错误,对cron降低优先级(priority) const CRON_METHOD_GET=0; const CRON_METHOD_POST=1; /** * @inheritdoc */ public static function tableName() { return '{{%cron}}'; } /** * @inheritdoc */ public function behaviors() { return [ TimestampBehavior::className(), ]; } /** * @inheritdoc */ public function rules() { return [ [['path','hash',], 'required'], ]; } static function create($path, $params=array(),$crontime=0,$priority=0){ $CRON_RENEW_TIMEOUT=Model::CRON_RENEW_TIMEOUT; if(isset($params['CRON_RENEW_TIMEOUT'])){ $CRON_RENEW_TIMEOUT=$params['CRON_RENEW_TIMEOUT']; unset($params['CRON_RENEW_TIMEOUT']); } $param=json_encode($params); $hash=md5($path.$param.($crontime>0?$crontime:"")); $cron_id=""; if($crontime>0){ $cron=Model::find()->where("`hash`='{$hash}'")->one(); if($cron){ $cron_id=$cron->getAttribute('id'); } }else{ //$cron_id=$cron->where("`hash`='{$hash}' and addtime>".(NOW_TIME-$CRON_RENEW_TIMEOUT))->getField('id'); $ex_cron=Model::find() ->where("`hash`='{$hash}'") ->select('id,created_at,status') ->orderBy('created_at desc') ->one(); if($ex_cron){ $cron_id=$ex_cron['id']; if($ex_cron['status']==Model::CRON_STATUS_RUN_OK || self::CRON_STATUS_RUN_FIL){ //可以新建任务 $cron_id=""; }elseif(NOW_TIME-$ex_cron['created_at']>Model::CRON_RENEW_TIMEOUT){ if ($ex_cron['status']!=Model::CRON_STATUS_RUN_FIL){ //修复任务 Model::updateAll([ 'retry'=>0, 'priority'=>3, ],['id'=>$ex_cron['id']]); } } } } if($cron_id) return; $cron=new Model(); $cron->hash=$hash; $cron->path=$path; $cron->param=$param; $cron->method=Model::CRON_METHOD_POST; $cron->crontime=$crontime+0; $cron->priority=$priority+0; $cron->save(); } static function result($cron_id,$result=true,$message=""){ if(!$cron_id) return; $cron=Model::findOne($cron_id); if(!$cron->id) return; $cron->updated_at=NOW_TIME; $cron->retry+=1; if($result){ $cron->status=self::CRON_STATUS_RUN_OK; }elseif($cron->retry>=self::CRON_RETRY_COUNT){ $cron->status=self::CRON_STATUS_RUN_FIL; } if(!$result){ $cron->priority-=1; if('api/cron.syncCommodities'==$cron->path){//顺延到同步商品任务最后 $lastcron=Model::find()->where(['path'=>$cron->path])->orderBy('crontime desc')->one(); $cron->crontime=($lastcron->crontime?$lastcron->crontime:time())+60; } } $cron->message=substr($message,0,250); $cron->save(); } static function crontab(){ $crontab=\Yii::$app->params['crontab']; foreach ($crontab as $cron){ list($path,$param,$crontime,$priority)=$cron; if(strstr($crontime,':')){ $crontime=getDayTime($crontime); }else{ $crontime=0; } if(is_array($param)){ $param_str=json_encode($param); }else{ $param_str='[]'; } $one=Model::find() ->where("status=".self::CRON_STATUS_DEF." and path='{$path}' and param='{$param_str}'") ->one(); if($one){ //update $one->priority=$priority; $one->crontime=$crontime; $one->save(); echo "update cron ".json_encode($cron)."\n"; }else{ self::create($path,$param,$crontime,$priority); echo "new cron ".json_encode($cron)."\n"; } } } //添加一条周期时间的任务 static function crontabNext($croning){ $crontab=\Yii::$app->params['crontab']; foreach ($crontab as $cron){ list($path,$param,$crontime,$priority)=$cron; if(is_array($param)){ $param_str=json_encode($param); }else{ $param_str='[]'; } if($path==$croning['path'] && $param_str==$croning['param']){ if(strstr($crontime,':')){ $crontime=getDayTime($crontime); }else{ $crontime=NOW_TIME+$crontime; } self::create($path,$param,$crontime,$priority); return; } } } }
\models\cron\Lib.php
<?php namespace app\models\cron; use GuzzleHttp\Pool; use GuzzleHttp\Client; use GuzzleHttp\Psr7\Request; use yii\helpers\Url; if(!defined('NOW_TIME')){ define('NOW_TIME',time()); } class Lib{ private $data,$base_url; private $totalPageCount; private $counter = 1; private $concurrency = 10; // 同时并发抓取 function implement(){ $this->data=Model::find() ->andWhere([ 'and', ['=','status',Model::CRON_STATUS_DEF], ['<','crontime',NOW_TIME], ]) ->andWhere([ 'or', ['<','updated_at',NOW_TIME-Model::CRON_RETRY_TIME], 'updated_at=created_at', ]) ->orderBy('`priority` desc,`created_at` asc') ->limit(30) //->createCommand()->getRawSql(); ->asArray()->all(); if(!$this->data) return; $this->totalPageCount=count($this->data); $this->base_url="http://127.0.0.1".(YII_DEBUG?'/ERP-V3/web/':'/'); $client = new Client(['base_uri' => $this->base_url]); $requests = function ($total) use ($client) { foreach ($this->data as $key => $cron) { $uri = $cron['path']; $param=$cron['param']?json_decode($cron['param'],true):[]; yield function() use ($client, $uri,$param) { return $client->postAsync($uri,['form_params'=>$param]); }; } }; $pool = new Pool($client, $requests($this->totalPageCount), [ 'concurrency' => $this->concurrency, 'fulfilled' => function ($response, $index){ $message=$response->getBody()->getContents(); $res = @json_decode($message); if(isset($res->code) && $res->code==200){ Model::result($this->data[$index]['id'],true,$message); Model::crontabNext($this->data[$index]); }else{ $message=preg_replace("/<[^>]+>/","",$message); Model::result($this->data[$index]['id'],false,$message); } $this->implementMessage($index,$message,isset($res->code)?$res->code:500); }, 'rejected' => function ($reason, $index){ list($message[],$message[],$message[])=explode("\n",$reason); $message=preg_replace("/<[^>]+>/","",implode("\n",$message)); Model::result($this->data[$index]['id'],false,$message); $this->implementMessage($index,$message); }, ]); // 开始发送请求 $promise = $pool->promise(); $promise->wait(); } function implementMessage($index,$message,$code=500){ echo "[{$code}]\t$index:$message\n=============================\n"; } }
\commands\InitController.php
<?php namespace app\commands; use app\models\cron\Lib as cronLib; use app\models\cron\Model; use app\models\system\Role; use app\models\User ; class InitController extends \yii\console\Controller { public function actionCron(){ $cronLib=new cronLib(); $cronLib->implement(); } public function actionCrontab(){ Model::crontab(); } }
\config\crontab.php
<?php /** * 周期性任务 * * 添加完成后 * 执行 yii init/crontab */ $crontab=[ //['路径',['参数名'=>'参数值'],'周期时间','优先级'], //['api/cron.flushMyAct',['params'=>'params'],"03:00:00",0], //每天03:00:00执行 //['api/cron.flushMyActx',['params'=>'params'],3600,0], //每小时执行 //['api/login',['username'=>'admin','password'=>'123456',],300,0], //每五分钟执行 //以上为参考格式,请勿删除,任务添加到下方 ['api/cron.flushOrderList',[],300,0], //每五分钟执行 ['api/cron.syncFeeds',[],86400*7,0], //每周执行 ['api/cron.matchTrackNo',[],600,0], //每10分钟执行 ]; return ['crontab'=>$crontab];
表结构
/*表: cron*/------------- /*列信息*/----------- Field Type Collation Null Key Default Extra ---------- ------------ --------------- ------ ------ ------- -------------- id int(11) (NULL) NO PRI (NULL) auto_increment path varchar(100) utf8_general_ci NO (NULL) param text utf8_general_ci YES (NULL) method tinyint(1) (NULL) YES 1 header varchar(250) utf8_general_ci YES (NULL) status smallint(1) (NULL) YES 0 retry smallint(1) (NULL) YES 0 created_at int(11) (NULL) NO 0 updated_at int(11) (NULL) NO 0 message varchar(250) utf8_general_ci YES (NULL) crontime int(11) (NULL) YES (NULL) hash char(32) utf8_general_ci YES (NULL) priority smallint(1) (NULL) YES 0 /*索引信息*/-------------- Table Non_unique Key_name Seq_in_index Column_name ------ ---------- -------- ------------ ----------- cron 0 PRIMARY 1 id /*DDL 信息*/------------ CREATE TABLE `cron` ( `id` int(11) NOT NULL AUTO_INCREMENT, `path` varchar(100) NOT NULL, `param` text, `method` tinyint(1) DEFAULT '1', `header` varchar(250) DEFAULT NULL, `status` smallint(1) DEFAULT '0', `retry` smallint(1) DEFAULT '0', `created_at` int(11) NOT NULL DEFAULT '0', `updated_at` int(11) NOT NULL DEFAULT '0', `message` varchar(250) DEFAULT NULL, `crontime` int(11) DEFAULT NULL, `hash` char(32) DEFAULT NULL, `priority` smallint(1) DEFAULT '0', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8
打赏
微信扫一扫,打赏作者吧~