记一次python和数据库死锁问题的排查

Posted by Keal on January 15, 2021

背景故事

最近排查一个初步觉得是死锁的问题,之前在生产环境也是偶尔出现,排查过程中解决了此问题引起的另外的几个问题,可是死锁这个问题一直没有成功在本地环境复现, 并且代码中也未能找到可能产生死锁的点. 直到最近国外10几个生产环境也出现了这个类似的问题,因此这个问题突然重视起来.我也能专心排查这个问题.

业务场景

影院播放一个电影除了需要片源外,还需要一个叫kdm的文件, 此文件相当于一个key, 有了这个key才能让放映器播放影片. 我们的TMS系统在一段时间内收到了大量的kdm文件. 我们需要将这些kdm发送到对应的设备.每次发送都需要获取锁来更新数据库信息.

模拟场景

环境

python: 2.6.6

sqlalchemy: 1.0.17

postgres:9.3.25

OS: CentOS release 6.10

数据表

test_cpl表

image-20210115152417872

test_kdm表

image-20210115152451763

代码文件

文件目录:

-db –orm.py –init.py -test_deadlock.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# orm.py
import random

from sqlalchemy.orm import scoped_session, sessionmaker

from sqlalchemy import create_engine, Column
from sqlalchemy.types import INT, String
from sqlalchemy.ext.declarative import declarative_base
from threading import RLock
import uuid
import logging

Base = declarative_base()
some_engine = create_engine('postgresql://postgres:postgres@172.18.0.2/tms_keal', pool_size=20, max_overflow=40)

Session = scoped_session(sessionmaker(bind=some_engine))


class MyRlock():

    def __init__(self):
        self.lock = RLock()

    def __enter__(self):
        logging.info('++acquiring lock!')
        self.lock.__enter__()
        logging.info('++acquired lock!')

    def __exit__(self, *args):
        logging.info('++releasing lock!')
        self.lock.__exit__(*args)
        logging.info('++released lock!')


global_lock = RLock()


def generate_random_string(length, type_=None):
    if type_ == "num":
        seed = "1234567890"
    elif type_ == "abc":
        seed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
    elif type_ == "word":
        seed = "1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
    else:
        seed = (
            "1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@$%^&*()_+=-"
        )
    string_list = list()
    for i in range(0, length):
        string_list.append(random.choice(seed))
    result = "".join(string_list)
    return result


def generate_uuid():
    return str(uuid.uuid4())


class TestCpl(Base):
    __tablename__ = 'test_cpl'

    uuid = Column(String, primary_key=True, default=generate_uuid)
    name = Column(String)
    rank = Column(INT)

    # kdms = relationship('TestKdm')

    def from_dict(self, d):
        self.uuid = d['uuid']
        self.name = d['name']
        self.rank = d['rank']

    def update(self, d):
        for k, v in d.items():
            setattr(self, k, v)


class TestKdm(Base):
    __tablename__ = 'test_kdm'

    uuid = Column(INT, primary_key=True, default=generate_uuid)
    username = Column(String)
    password = Column(String)
    cpl_uuid = Column(String)

    def from_dict(self, d):
        for k, v in d.items():
            setattr(self, k, v)

    def update(self, d):
        for k, v in d.items():
            setattr(self, k, v)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# test_deadlock.py
from threading import Thread, RLock
import random
import logging
import time
from db.orm import generate_uuid, TestKdm, TestCpl, generate_random_string, global_lock, Session

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(filename)s[line:%(lineno)d]::[%(threadName)s] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='no_commit.log',
                    filemode='a'
                    )


class Device():

    def __init__(self):
        self.kdm_uuids = [generate_uuid() for i in range(1)]
        self.cpl_uuids = [generate_uuid() for i in range(1)]
        self.kdm_lock = RLock()
        self.sync_status = False

    def content_add_key(self):
        try:
            session = Session
            # while 1:
            logging.info('++Add')
            self._new_kdm(session)
            time.sleep(30)
            with self.kdm_lock:
                session.commit()
        except Exception as err:
            logging.info('++++content_add_key')
            logging.info(err)

    def device_sync_key_information(self):
        session = Session
        # while 1:
        try:
            with self.kdm_lock:
                with Session.no_autoflush:
                    logging.info('++Sync')
                    self._new_kdm(session)
                    time.sleep(25)
                    session.commit()


        except Exception as err:
            logging.info('++++device_sync_key_information')
            logging.info(err)

    def keep_session(self):
        session = Session()
        while 1:
            kdms = session.query(TestKdm).all()
            print(len(kdms))
            time.sleep(10)

    def _new_kdm(self, session):
        uuid = self.kdm_uuids[0]
        username = generate_random_string(length=4, type_='word')
        password = generate_random_string(length=8, type_='num')
        cpl_uuid = self.cpl_uuids[0]
        logging.info(cpl_uuid)
        cpl_name = generate_random_string(length=3, type_='word')
        cpl_rank = random.randint(1, 20)
        kdm = TestKdm()
        kdm.from_dict(dict(uuid=uuid,
                           username=username,
                           password=password,
                           cpl_uuid=cpl_uuid))
        cpl = TestCpl()
        cpl.from_dict(dict(uuid=cpl_uuid,
                           name=cpl_name,
                           rank=cpl_rank))
        session.merge(cpl)
        session.merge(kdm)

def start_one_device():
    device = Device()
    t1 = Thread(target=device.content_add_key)
    t2 = Thread(target=device.device_sync_key_information)
    t1.start()
    time.sleep(10)
    t2.start()


def main(device_num):
    for i in range(device_num):
        start_one_device()

if __name__ == '__main__':
    import os

    print(os.getpid())
    main(1)

执行test_deadlock.py文件后

查看postgresql数据库日志,发现有两个事物开启(BEGIN)了之后没有完成的日志,并且有事务卡在获取ShareLock

image-20210115153123921

查看pg_stat_activity,发现有个连接处于idle in transaction, 有个连接处于active状态,并且waiting为true表示正在等待一个锁

image-20210115153400827

再查看pg_locks,可以看到事务7/22104已经获取了808585事务的ExclusiveLock(granted为True), 正在尝试获取808584的ShareLock(granted为False). 而6/36907这个事务已经获取了808584的ExclusiveLock,因此7/22104无法获取到808584的ShareLock. 关于各种锁的详细解释可以参考postgressql锁文档

image-20210115153522993

贴一个Postgresql的中文文档pg_locks的列的解释

image-20210115153935160

死锁原因

content_add_key()方法中, 由于没有像device_sync_key_information()那样使用Session.no_autoflush, sqlalchemy会在query语句和merge的时候,自动调用flush(),将当前session中的修改内容提交到数据库. 所以在_new_kdm()时,session.merge(kdm)语句会将此前的session.merge(cpl)语句提交到数据库,此时会获得一个行级锁A同时阻塞后续试图修改相同cpl的事务.而device_sync_key_information()中也会调用_new_kdm(),模拟代码中直接使用了相同的uuid,因此两个线程启动后会尝试更新同一个cpl. 而因为content_add_key()在commit之前试图获取kdm_lock,此锁却被执行device_sync_key_information()的线程持有,因此触发了死锁

image-20210115165931935

注意事项:

  1. 在使用sqlalchemy的session时,注意auto_flush开启后的隐藏效果.
  2. 注意一个session操作的原子性,避免这种一个事务期间去获取另一个线程锁的行为. 本来无论是python自身的死锁还是数据库本身的死锁,都比较容易复现和排查.但是像这个例子中,两者结合的一个死锁,既无法在代码中观察到明显的死锁结构. 又不会触发postgresql的死锁检测机制.

#