X
    Categories: mysql

并发多线程引起的 mysql 触发器 编号重复

场景描述:

mysql 中order 表有一个触发器,新增的order 将自动创建一个订单编号;编号为日期+今日订单数量

DELIMITER $$

USE `erp3.0`$$

DROP TRIGGER /*!50032 IF EXISTS */ `order_ext_id_insert`$$

CREATE
    /*!50017 DEFINER = 'root'@'%' */
    TRIGGER `order_ext_id_insert` BEFORE INSERT ON `order` 
    FOR EACH ROW BEGIN
	DECLARE num INT;
	SELECT COUNT(o.id) INTO num FROM `order` o WHERE  o.created_at>=UNIX_TIMESTAMP(CURDATE());
	SET new.ext_order_id=CONCAT(DATE_FORMAT(NOW(),'%Y%m%d'),num+1);
END;
$$

DELIMITER ;

此触发器在单线程(命令行执行批量插入)下跑很正常,ext_order_id 不会重复;

但我这边的场景并发多线程写入order表,此时ext_order_id 会出现重复;

怎么解决呢?锁表,这个风险太大,不敢使用;看了看存储过程的优点,应该可以解决这个问题,写了存储过程:

DELIMITER $$

USE `erp3.0`$$

DROP PROCEDURE IF EXISTS `generate_orderNo2`$$

CREATE DEFINER=`root`@`%` PROCEDURE `generate_orderNo2`(IN orderNamePre CHAR(2), IN num INT, OUT newOrderNo VARCHAR(25))
BEGIN
	  DECLARE currentDate VARCHAR (15) ;-- 当前日期,有可能包含时分秒     
	  DECLARE maxNo INT DEFAULT 0 ; -- 离现在最近的满足条件的订单编号的流水号最后5位,如:SH2013011000002的maxNo=2     
	--  DECLARE l_orderNo varCHAR (25) ;-- 新生成的订单编号     
	--  DECLARE oldDate DATE ;-- 离现在最近的满足条件的订单编号的日期     
	  DECLARE oldOrderNo VARCHAR (25) DEFAULT '' ;-- 离现在最近的满足条件的订单编号     
	      
	  IF num = 8 THEN -- 根据年月日生成订单编号     
	    SELECT DATE_FORMAT(NOW(), '%Y%m%d') INTO currentDate ;-- 订单编号形式:前缀+年月日+流水号,如:SH2013011000002     
	  ELSEIF num = 14 THEN -- 根据年月日时分秒生成订单编号     
	    SELECT DATE_FORMAT(NOW(), '%Y%m%d%H%i%s') INTO currentDate ; -- 订单编号形式:前缀+年月日时分秒+流水号,如:SH2013011010050700001,个人不推荐使用这种方法生成流水号     
	  ELSE -- 根据年月日时分生成订单编号     
	    SELECT DATE_FORMAT(NOW(), '%Y%m%d%H%i') INTO currentDate ;-- 订单形式:前缀+年月日时分+流水号,如:SH20130110100900005     
	  END IF ;    
	      
	  SELECT IFNULL(ext_order_id, '') INTO oldOrderNo     
	  FROM `order`     
	  WHERE SUBSTRING(ext_order_id, 3, num) = currentDate     
	    AND SUBSTRING(ext_order_id, 1, 2) = orderNamePre     
	    AND LENGTH(ext_order_id) = 7 + num    
	  ORDER BY id DESC LIMIT 1 ; -- 有多条时只显示离现在最近的一条     
	      
	  IF oldOrderNo != '' THEN     
	    SET maxNo = CONVERT(SUBSTRING(oldOrderNo, -5), DECIMAL) ;-- SUBSTRING(oldOrderNo, -5):订单编号如果不为‘‘截取订单的最后5位     
	  END IF ;    
	  SELECT     
	    CONCAT(orderNamePre, currentDate,  LPAD((maxNo + 1), 5, '0')) INTO newOrderNo ; -- LPAD((maxNo + 1), 5, '0'):如果不足5位,将用0填充左边     
	      
	  INSERT INTO `order` (ext_order_id, increment_id) VALUES (newOrderNo, newOrderNo) ; -- 向订单表中插入数据     
	--    set newOrderNo = l_orderNo;     
	  SELECT     
	    newOrderNo ;    
    END$$

DELIMITER ;

修改php代码,使用 generate_orderNo2 来处理;

private function createNewOrder($orderRaw,$os){
        //$cmd = \Yii::$app->db->createCommand("call generate_orderNo(@ext_order_id{$orderRaw['increment_id']})");
        //$res = $cmd->queryOne();
        $pre="VK";
        $num=8;
        $cmd = \Yii::$app->db->createCommand("call generate_orderNo2(:pre,:num,@ext_order_id)");
        $cmd->bindParam(':pre',$pre,\PDO::PARAM_STR,2);
        $cmd->bindParam(':num',$num,\PDO::PARAM_INT,1);
        $res = $cmd->queryOne();

        $order = Order::find()->where(['ext_order_id'=>$res['newOrderNo']])->one();
        
        $order->increment_id = $orderRaw['increment_id'];
        $order->payment_status = $orderRaw['status'];
        $order->payment_method = $orderRaw['payment']['method'];
        $order->save();
        
}

尝试多线程执行写入,依然重复了,看来存储过程真的就是一些过程的封装,不能保证过程的独占性;

解决方案,我的订单数据来源是一个订单数据原始表(order_source),茅塞顿开,之前的触发器获取的是order表的今日数量,order表同时写入编号就会重复,而 order_source 表是已经存在的数据,不会发生变化,直接获取订单在 order_source 表中的位置排序即可,改变了触发器为:

DELIMITER $$

USE `erp3.0`$$

DROP TRIGGER /*!50032 IF EXISTS */ `order_ext_id_insert`$$

CREATE
    /*!50017 DEFINER = 'root'@'%' */
    TRIGGER `order_ext_id_insert` BEFORE INSERT ON `order` 
    FOR EACH ROW BEGIN
	DECLARE num INT;
	DECLARE nowday VARCHAR(10);
	/*SELECT IFNULL(MAX(RIGHT(ext_order_id,4)),0) INTO num FROM `order` WHERE MID(ext_order_id,1,8)=DATE_FORMAT(CURDATE(),'%Y%m%d');*/
	/*SUBSTRING(OSA.`updated_at`, 1, 10)*/	
	SELECT COUNT(DISTINCT(OSA.order_id)),DATE_FORMAT(OSA.`updated_at`, '%Y%m%d') INTO num,nowday 
		FROM `order_source` OSA
		INNER JOIN  `order_source` OSB ON OSB.increment_id=NEW.increment_id
		WHERE OSA.`order_id`<=OSB.`order_id` AND OSA.`updated_at`>CONCAT( SUBSTRING(OSB.`updated_at`, 1, 10) ,' 00:00:00');
	
	SET NEW.ext_order_id=CONCAT(nowday,RIGHT(10000+num,4));
END;
$$

DELIMITER ;

尝试多线程执行写入,没有重复,完美;

 

此种php多线程使用的是 GuzzleHttp 的多线程方案,同时发起30条任务:

<?php
namespace app\models\cron;

use GuzzleHttp\Pool;
use GuzzleHttp\Client;
use GuzzleHttp\Psr7\Request;
use yii\helpers\Url;
if(!defined('NOW_TIME')){
    define('NOW_TIME',time());
}
class Lib{
    private $data,$base_url;
    private $totalPageCount;
    private $counter        = 1;
    private $concurrency    = 10;  // 同时并发抓取
    function implement(){
        $this->data=Model::find()
            ->andWhere([
                'and',
                ['=','status',Model::CRON_STATUS_DEF],
                ['<','crontime',NOW_TIME],
            ])
            ->andWhere([
                'or',
                ['<','updated_at',NOW_TIME-Model::CRON_RETRY_TIME],
                'updated_at=created_at',
            ])
            ->orderBy('`priority` desc,`created_at` asc')
            ->limit(30)
            //->createCommand()->getRawSql();
            ->asArray()->all();
        if(!$this->data) return;
        $this->totalPageCount=count($this->data);
        $this->base_url="http://127.0.0.1".(YII_DEBUG?'/ERP-V3/web/':'/');
        $client = new Client(['base_uri' => $this->base_url]);
        $requests = function ($total) use ($client) {
            foreach ($this->data as $key => $cron) {
                $uri = $cron['path'];
                $param=$cron['param']?json_decode($cron['param'],true):[];
                yield function() use ($client, $uri,$param) {
                    return $client->postAsync($uri,['form_params'=>$param]);
                };
            }
        };
        
        $pool = new Pool($client, $requests($this->totalPageCount), [
            'concurrency' => $this->concurrency,
            'fulfilled'   => function ($response, $index){
                $message=$response->getBody()->getContents();
                $res = @json_decode($message);
                if(isset($res->code) && $res->code==200){
                    Model::result($this->data[$index]['id'],true,$message);
                    Model::crontabNext($this->data[$index]);
                }else{
                    $message=preg_replace("/<[^>]+>/","",$message);
                    Model::result($this->data[$index]['id'],false,$message);
                }
                $this->implementMessage($index,$message,isset($res->code)?$res->code:500);

            },
            'rejected' => function ($reason, $index){
                list($message[],$message[],$message[])=explode("\n",$reason);
                $message=preg_replace("/<[^>]+>/","",implode("\n",$message));
                Model::result($this->data[$index]['id'],false,$message);
                $this->implementMessage($index,$message);
            },
        ]);

        // 开始发送请求
        $promise = $pool->promise();
        $promise->wait();
    }
    function implementMessage($index,$message,$code=500){
        echo "[{$code}]\t$index:$message\n=============================\n";
    }
}

 

 

打赏
微信扫一扫,打赏作者吧~
admin :