此 cron 优点:
1,linux crontab 只需加入 ***** php yii init/cron
2, cron 通过 config 文件配置,修改配置后需执行 php yii init/crontab
3,重试机制,任务出错后将延缓5分钟执行,直到达到最大尝试次数后,标识为失败
4,兼容周期性任务、一次性任务
5,多线程执行任务,任务不阻塞
目录结构
/
|-models
|--cron
|---Model.php
|---Lib.php
|-commands
|--InitController.php
|-config
|--crontab.php
\models\cron\Model.php
<?php
namespace app\models\cron;
use yii\behaviors\TimestampBehavior;
use yii\db\ActiveRecord;
if(!defined('NOW_TIME')){
define('NOW_TIME',time());
}
class Model extends ActiveRecord
{
const CRON_STATUS_DEF=0;
const CRON_STATUS_RUN=1;
const CRON_STATUS_RUN_FIL=-1;
const CRON_STATUS_RUN_OK=2;
const CRON_RETRY_COUNT=50;//断定不可访问
const CRON_RETRY_TIME=300;//重试时间
const CRON_RENEW_TIMEOUT=3600;//新建任务时间周期
const CRON_RENEW_TIMEOUT_KEY='CRON_RENEW_TIMEOUT';
const CRON_TIMEOUT=86400;//不再使用retry进行判断,每次执行错误,对cron降低优先级(priority)
const CRON_METHOD_GET=0;
const CRON_METHOD_POST=1;
/**
* @inheritdoc
*/
public static function tableName()
{
return '{{%cron}}';
}
/**
* @inheritdoc
*/
public function behaviors()
{
return [
TimestampBehavior::className(),
];
}
/**
* @inheritdoc
*/
public function rules()
{
return [
[['path','hash',], 'required'],
];
}
static function create($path, $params=array(),$crontime=0,$priority=0){
$CRON_RENEW_TIMEOUT=Model::CRON_RENEW_TIMEOUT;
if(isset($params['CRON_RENEW_TIMEOUT'])){
$CRON_RENEW_TIMEOUT=$params['CRON_RENEW_TIMEOUT'];
unset($params['CRON_RENEW_TIMEOUT']);
}
$param=json_encode($params);
$hash=md5($path.$param.($crontime>0?$crontime:""));
$cron_id="";
if($crontime>0){
$cron=Model::find()->where("`hash`='{$hash}'")->one();
if($cron){
$cron_id=$cron->getAttribute('id');
}
}else{
//$cron_id=$cron->where("`hash`='{$hash}' and addtime>".(NOW_TIME-$CRON_RENEW_TIMEOUT))->getField('id');
$ex_cron=Model::find()
->where("`hash`='{$hash}'")
->select('id,created_at,status')
->orderBy('created_at desc')
->one();
if($ex_cron){
$cron_id=$ex_cron['id'];
if($ex_cron['status']==Model::CRON_STATUS_RUN_OK || self::CRON_STATUS_RUN_FIL){
//可以新建任务
$cron_id="";
}elseif(NOW_TIME-$ex_cron['created_at']>Model::CRON_RENEW_TIMEOUT){
if ($ex_cron['status']!=Model::CRON_STATUS_RUN_FIL){
//修复任务
Model::updateAll([
'retry'=>0,
'priority'=>3,
],['id'=>$ex_cron['id']]);
}
}
}
}
if($cron_id) return;
$cron=new Model();
$cron->hash=$hash;
$cron->path=$path;
$cron->param=$param;
$cron->method=Model::CRON_METHOD_POST;
$cron->crontime=$crontime+0;
$cron->priority=$priority+0;
$cron->save();
}
static function result($cron_id,$result=true,$message=""){
if(!$cron_id) return;
$cron=Model::findOne($cron_id);
if(!$cron->id) return;
$cron->updated_at=NOW_TIME;
$cron->retry+=1;
if($result){
$cron->status=self::CRON_STATUS_RUN_OK;
}elseif($cron->retry>=self::CRON_RETRY_COUNT){
$cron->status=self::CRON_STATUS_RUN_FIL;
}
if(!$result){
$cron->priority-=1;
if('api/cron.syncCommodities'==$cron->path){//顺延到同步商品任务最后
$lastcron=Model::find()->where(['path'=>$cron->path])->orderBy('crontime desc')->one();
$cron->crontime=($lastcron->crontime?$lastcron->crontime:time())+60;
}
}
$cron->message=substr($message,0,250);
$cron->save();
}
static function crontab(){
$crontab=\Yii::$app->params['crontab'];
foreach ($crontab as $cron){
list($path,$param,$crontime,$priority)=$cron;
if(strstr($crontime,':')){
$crontime=getDayTime($crontime);
}else{
$crontime=0;
}
if(is_array($param)){
$param_str=json_encode($param);
}else{
$param_str='[]';
}
$one=Model::find()
->where("status=".self::CRON_STATUS_DEF." and path='{$path}' and param='{$param_str}'")
->one();
if($one){
//update
$one->priority=$priority;
$one->crontime=$crontime;
$one->save();
echo "update cron ".json_encode($cron)."\n";
}else{
self::create($path,$param,$crontime,$priority);
echo "new cron ".json_encode($cron)."\n";
}
}
}
//添加一条周期时间的任务
static function crontabNext($croning){
$crontab=\Yii::$app->params['crontab'];
foreach ($crontab as $cron){
list($path,$param,$crontime,$priority)=$cron;
if(is_array($param)){
$param_str=json_encode($param);
}else{
$param_str='[]';
}
if($path==$croning['path'] && $param_str==$croning['param']){
if(strstr($crontime,':')){
$crontime=getDayTime($crontime);
}else{
$crontime=NOW_TIME+$crontime;
}
self::create($path,$param,$crontime,$priority);
return;
}
}
}
}
\models\cron\Lib.php
<?php
namespace app\models\cron;
use GuzzleHttp\Pool;
use GuzzleHttp\Client;
use GuzzleHttp\Psr7\Request;
use yii\helpers\Url;
if(!defined('NOW_TIME')){
define('NOW_TIME',time());
}
class Lib{
private $data,$base_url;
private $totalPageCount;
private $counter = 1;
private $concurrency = 10; // 同时并发抓取
function implement(){
$this->data=Model::find()
->andWhere([
'and',
['=','status',Model::CRON_STATUS_DEF],
['<','crontime',NOW_TIME],
])
->andWhere([
'or',
['<','updated_at',NOW_TIME-Model::CRON_RETRY_TIME],
'updated_at=created_at',
])
->orderBy('`priority` desc,`created_at` asc')
->limit(30)
//->createCommand()->getRawSql();
->asArray()->all();
if(!$this->data) return;
$this->totalPageCount=count($this->data);
$this->base_url="http://127.0.0.1".(YII_DEBUG?'/ERP-V3/web/':'/');
$client = new Client(['base_uri' => $this->base_url]);
$requests = function ($total) use ($client) {
foreach ($this->data as $key => $cron) {
$uri = $cron['path'];
$param=$cron['param']?json_decode($cron['param'],true):[];
yield function() use ($client, $uri,$param) {
return $client->postAsync($uri,['form_params'=>$param]);
};
}
};
$pool = new Pool($client, $requests($this->totalPageCount), [
'concurrency' => $this->concurrency,
'fulfilled' => function ($response, $index){
$message=$response->getBody()->getContents();
$res = @json_decode($message);
if(isset($res->code) && $res->code==200){
Model::result($this->data[$index]['id'],true,$message);
Model::crontabNext($this->data[$index]);
}else{
$message=preg_replace("/<[^>]+>/","",$message);
Model::result($this->data[$index]['id'],false,$message);
}
$this->implementMessage($index,$message,isset($res->code)?$res->code:500);
},
'rejected' => function ($reason, $index){
list($message[],$message[],$message[])=explode("\n",$reason);
$message=preg_replace("/<[^>]+>/","",implode("\n",$message));
Model::result($this->data[$index]['id'],false,$message);
$this->implementMessage($index,$message);
},
]);
// 开始发送请求
$promise = $pool->promise();
$promise->wait();
}
function implementMessage($index,$message,$code=500){
echo "[{$code}]\t$index:$message\n=============================\n";
}
}
\commands\InitController.php
<?php
namespace app\commands;
use app\models\cron\Lib as cronLib;
use app\models\cron\Model;
use app\models\system\Role;
use app\models\User ;
class InitController extends \yii\console\Controller
{
public function actionCron(){
$cronLib=new cronLib();
$cronLib->implement();
}
public function actionCrontab(){
Model::crontab();
}
}
\config\crontab.php
<?php
/**
* 周期性任务
*
* 添加完成后
* 执行 yii init/crontab
*/
$crontab=[
//['路径',['参数名'=>'参数值'],'周期时间','优先级'],
//['api/cron.flushMyAct',['params'=>'params'],"03:00:00",0], //每天03:00:00执行
//['api/cron.flushMyActx',['params'=>'params'],3600,0], //每小时执行
//['api/login',['username'=>'admin','password'=>'123456',],300,0], //每五分钟执行
//以上为参考格式,请勿删除,任务添加到下方
['api/cron.flushOrderList',[],300,0], //每五分钟执行
['api/cron.syncFeeds',[],86400*7,0], //每周执行
['api/cron.matchTrackNo',[],600,0], //每10分钟执行
];
return ['crontab'=>$crontab];
表结构
/*表: cron*/------------- /*列信息*/----------- Field Type Collation Null Key Default Extra ---------- ------------ --------------- ------ ------ ------- -------------- id int(11) (NULL) NO PRI (NULL) auto_increment path varchar(100) utf8_general_ci NO (NULL) param text utf8_general_ci YES (NULL) method tinyint(1) (NULL) YES 1 header varchar(250) utf8_general_ci YES (NULL) status smallint(1) (NULL) YES 0 retry smallint(1) (NULL) YES 0 created_at int(11) (NULL) NO 0 updated_at int(11) (NULL) NO 0 message varchar(250) utf8_general_ci YES (NULL) crontime int(11) (NULL) YES (NULL) hash char(32) utf8_general_ci YES (NULL) priority smallint(1) (NULL) YES 0 /*索引信息*/-------------- Table Non_unique Key_name Seq_in_index Column_name ------ ---------- -------- ------------ ----------- cron 0 PRIMARY 1 id /*DDL 信息*/------------ CREATE TABLE `cron` ( `id` int(11) NOT NULL AUTO_INCREMENT, `path` varchar(100) NOT NULL, `param` text, `method` tinyint(1) DEFAULT '1', `header` varchar(250) DEFAULT NULL, `status` smallint(1) DEFAULT '0', `retry` smallint(1) DEFAULT '0', `created_at` int(11) NOT NULL DEFAULT '0', `updated_at` int(11) NOT NULL DEFAULT '0', `message` varchar(250) DEFAULT NULL, `crontime` int(11) DEFAULT NULL, `hash` char(32) DEFAULT NULL, `priority` smallint(1) DEFAULT '0', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8
打赏
微信扫一扫,打赏作者吧~