场景描述:
mysql 中order 表有一个触发器,新增的order 将自动创建一个订单编号;编号为日期+今日订单数量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
DELIMITER $$ USE `erp3.0`$$ DROP TRIGGER /*!50032 IF EXISTS */ `order_ext_id_insert`$$ CREATE /*!50017 DEFINER = 'root'@'%' */ TRIGGER `order_ext_id_insert` BEFORE INSERT ON `order` FOR EACH ROW BEGIN DECLARE num INT; SELECT COUNT(o.id) INTO num FROM `order` o WHERE o.created_at>=UNIX_TIMESTAMP(CURDATE()); SET new.ext_order_id=CONCAT(DATE_FORMAT(NOW(),'%Y%m%d'),num+1); END; $$ DELIMITER ; |
此触发器在单线程(命令行执行批量插入)下跑很正常,ext_order_id 不会重复;
但我这边的场景并发多线程写入order表,此时ext_order_id 会出现重复;
怎么解决呢?锁表,这个风险太大,不敢使用;看了看存储过程的优点,应该可以解决这个问题,写了存储过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
DELIMITER $$ USE `erp3.0`$$ DROP PROCEDURE IF EXISTS `generate_orderNo2`$$ CREATE DEFINER=`root`@`%` PROCEDURE `generate_orderNo2`(IN orderNamePre CHAR(2), IN num INT, OUT newOrderNo VARCHAR(25)) BEGIN DECLARE currentDate VARCHAR (15) ;-- 当前日期,有可能包含时分秒 DECLARE maxNo INT DEFAULT 0 ; -- 离现在最近的满足条件的订单编号的流水号最后5位,如:SH2013011000002的maxNo=2 -- DECLARE l_orderNo varCHAR (25) ;-- 新生成的订单编号 -- DECLARE oldDate DATE ;-- 离现在最近的满足条件的订单编号的日期 DECLARE oldOrderNo VARCHAR (25) DEFAULT '' ;-- 离现在最近的满足条件的订单编号 IF num = 8 THEN -- 根据年月日生成订单编号 SELECT DATE_FORMAT(NOW(), '%Y%m%d') INTO currentDate ;-- 订单编号形式:前缀+年月日+流水号,如:SH2013011000002 ELSEIF num = 14 THEN -- 根据年月日时分秒生成订单编号 SELECT DATE_FORMAT(NOW(), '%Y%m%d%H%i%s') INTO currentDate ; -- 订单编号形式:前缀+年月日时分秒+流水号,如:SH2013011010050700001,个人不推荐使用这种方法生成流水号 ELSE -- 根据年月日时分生成订单编号 SELECT DATE_FORMAT(NOW(), '%Y%m%d%H%i') INTO currentDate ;-- 订单形式:前缀+年月日时分+流水号,如:SH20130110100900005 END IF ; SELECT IFNULL(ext_order_id, '') INTO oldOrderNo FROM `order` WHERE SUBSTRING(ext_order_id, 3, num) = currentDate AND SUBSTRING(ext_order_id, 1, 2) = orderNamePre AND LENGTH(ext_order_id) = 7 + num ORDER BY id DESC LIMIT 1 ; -- 有多条时只显示离现在最近的一条 IF oldOrderNo != '' THEN SET maxNo = CONVERT(SUBSTRING(oldOrderNo, -5), DECIMAL) ;-- SUBSTRING(oldOrderNo, -5):订单编号如果不为‘‘截取订单的最后5位 END IF ; SELECT CONCAT(orderNamePre, currentDate, LPAD((maxNo + 1), 5, '0')) INTO newOrderNo ; -- LPAD((maxNo + 1), 5, '0'):如果不足5位,将用0填充左边 INSERT INTO `order` (ext_order_id, increment_id) VALUES (newOrderNo, newOrderNo) ; -- 向订单表中插入数据 -- set newOrderNo = l_orderNo; SELECT newOrderNo ; END$$ DELIMITER ; |
修改php代码,使用 generate_orderNo2 来处理;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
private function createNewOrder($orderRaw,$os){ //$cmd = \Yii::$app->db->createCommand("call generate_orderNo(@ext_order_id{$orderRaw['increment_id']})"); //$res = $cmd->queryOne(); $pre="VK"; $num=8; $cmd = \Yii::$app->db->createCommand("call generate_orderNo2(:pre,:num,@ext_order_id)"); $cmd->bindParam(':pre',$pre,\PDO::PARAM_STR,2); $cmd->bindParam(':num',$num,\PDO::PARAM_INT,1); $res = $cmd->queryOne(); $order = Order::find()->where(['ext_order_id'=>$res['newOrderNo']])->one(); $order->increment_id = $orderRaw['increment_id']; $order->payment_status = $orderRaw['status']; $order->payment_method = $orderRaw['payment']['method']; $order->save(); } |
尝试多线程执行写入,依然重复了,看来存储过程真的就是一些过程的封装,不能保证过程的独占性;
解决方案,我的订单数据来源是一个订单数据原始表(order_source),茅塞顿开,之前的触发器获取的是order表的今日数量,order表同时写入编号就会重复,而 order_source 表是已经存在的数据,不会发生变化,直接获取订单在 order_source 表中的位置排序即可,改变了触发器为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
DELIMITER $$ USE `erp3.0`$$ DROP TRIGGER /*!50032 IF EXISTS */ `order_ext_id_insert`$$ CREATE /*!50017 DEFINER = 'root'@'%' */ TRIGGER `order_ext_id_insert` BEFORE INSERT ON `order` FOR EACH ROW BEGIN DECLARE num INT; DECLARE nowday VARCHAR(10); /*SELECT IFNULL(MAX(RIGHT(ext_order_id,4)),0) INTO num FROM `order` WHERE MID(ext_order_id,1,8)=DATE_FORMAT(CURDATE(),'%Y%m%d');*/ /*SUBSTRING(OSA.`updated_at`, 1, 10)*/ SELECT COUNT(DISTINCT(OSA.order_id)),DATE_FORMAT(OSA.`updated_at`, '%Y%m%d') INTO num,nowday FROM `order_source` OSA INNER JOIN `order_source` OSB ON OSB.increment_id=NEW.increment_id WHERE OSA.`order_id`<=OSB.`order_id` AND OSA.`updated_at`>CONCAT( SUBSTRING(OSB.`updated_at`, 1, 10) ,' 00:00:00'); SET NEW.ext_order_id=CONCAT(nowday,RIGHT(10000+num,4)); END; $$ DELIMITER ; |
尝试多线程执行写入,没有重复,完美;
此种php多线程使用的是 GuzzleHttp 的多线程方案,同时发起30条任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
<?php namespace app\models\cron; use GuzzleHttp\Pool; use GuzzleHttp\Client; use GuzzleHttp\Psr7\Request; use yii\helpers\Url; if(!defined('NOW_TIME')){ define('NOW_TIME',time()); } class Lib{ private $data,$base_url; private $totalPageCount; private $counter = 1; private $concurrency = 10; // 同时并发抓取 function implement(){ $this->data=Model::find() ->andWhere([ 'and', ['=','status',Model::CRON_STATUS_DEF], ['<','crontime',NOW_TIME], ]) ->andWhere([ 'or', ['<','updated_at',NOW_TIME-Model::CRON_RETRY_TIME], 'updated_at=created_at', ]) ->orderBy('`priority` desc,`created_at` asc') ->limit(30) //->createCommand()->getRawSql(); ->asArray()->all(); if(!$this->data) return; $this->totalPageCount=count($this->data); $this->base_url="http://127.0.0.1".(YII_DEBUG?'/ERP-V3/web/':'/'); $client = new Client(['base_uri' => $this->base_url]); $requests = function ($total) use ($client) { foreach ($this->data as $key => $cron) { $uri = $cron['path']; $param=$cron['param']?json_decode($cron['param'],true):[]; yield function() use ($client, $uri,$param) { return $client->postAsync($uri,['form_params'=>$param]); }; } }; $pool = new Pool($client, $requests($this->totalPageCount), [ 'concurrency' => $this->concurrency, 'fulfilled' => function ($response, $index){ $message=$response->getBody()->getContents(); $res = @json_decode($message); if(isset($res->code) && $res->code==200){ Model::result($this->data[$index]['id'],true,$message); Model::crontabNext($this->data[$index]); }else{ $message=preg_replace("/<[^>]+>/","",$message); Model::result($this->data[$index]['id'],false,$message); } $this->implementMessage($index,$message,isset($res->code)?$res->code:500); }, 'rejected' => function ($reason, $index){ list($message[],$message[],$message[])=explode("\n",$reason); $message=preg_replace("/<[^>]+>/","",implode("\n",$message)); Model::result($this->data[$index]['id'],false,$message); $this->implementMessage($index,$message); }, ]); // 开始发送请求 $promise = $pool->promise(); $promise->wait(); } function implementMessage($index,$message,$code=500){ echo "[{$code}]\t$index:$message\n=============================\n"; } } |