背景故事
最近排查一个初步觉得是死锁的问题,之前在生产环境也是偶尔出现,排查过程中解决了此问题引起的另外的几个问题,可是死锁这个问题一直没有成功在本地环境复现, 并且代码中也未能找到可能产生死锁的点. 直到最近国外10几个生产环境也出现了这个类似的问题,因此这个问题突然重视起来.我也能专心排查这个问题.
业务场景
影院播放一个电影除了需要片源外,还需要一个叫kdm的文件, 此文件相当于一个key, 有了这个key才能让放映器播放影片. 我们的TMS系统在一段时间内收到了大量的kdm文件. 我们需要将这些kdm发送到对应的设备.每次发送都需要获取锁来更新数据库信息.
模拟场景
环境
python: 2.6.6
sqlalchemy: 1.0.17
postgres:9.3.25
OS: CentOS release 6.10
数据表
test_cpl表
test_kdm表
代码文件
文件目录:
-db –orm.py –init.py -test_deadlock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# orm.py
import random
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy import create_engine, Column
from sqlalchemy.types import INT, String
from sqlalchemy.ext.declarative import declarative_base
from threading import RLock
import uuid
import logging
Base = declarative_base()
some_engine = create_engine('postgresql://postgres:postgres@172.18.0.2/tms_keal', pool_size=20, max_overflow=40)
Session = scoped_session(sessionmaker(bind=some_engine))
class MyRlock():
def __init__(self):
self.lock = RLock()
def __enter__(self):
logging.info('++acquiring lock!')
self.lock.__enter__()
logging.info('++acquired lock!')
def __exit__(self, *args):
logging.info('++releasing lock!')
self.lock.__exit__(*args)
logging.info('++released lock!')
global_lock = RLock()
def generate_random_string(length, type_=None):
if type_ == "num":
seed = "1234567890"
elif type_ == "abc":
seed = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
elif type_ == "word":
seed = "1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
else:
seed = (
"1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@$%^&*()_+=-"
)
string_list = list()
for i in range(0, length):
string_list.append(random.choice(seed))
result = "".join(string_list)
return result
def generate_uuid():
return str(uuid.uuid4())
class TestCpl(Base):
__tablename__ = 'test_cpl'
uuid = Column(String, primary_key=True, default=generate_uuid)
name = Column(String)
rank = Column(INT)
# kdms = relationship('TestKdm')
def from_dict(self, d):
self.uuid = d['uuid']
self.name = d['name']
self.rank = d['rank']
def update(self, d):
for k, v in d.items():
setattr(self, k, v)
class TestKdm(Base):
__tablename__ = 'test_kdm'
uuid = Column(INT, primary_key=True, default=generate_uuid)
username = Column(String)
password = Column(String)
cpl_uuid = Column(String)
def from_dict(self, d):
for k, v in d.items():
setattr(self, k, v)
def update(self, d):
for k, v in d.items():
setattr(self, k, v)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# test_deadlock.py
from threading import Thread, RLock
import random
import logging
import time
from db.orm import generate_uuid, TestKdm, TestCpl, generate_random_string, global_lock, Session
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d]::[%(threadName)s] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='no_commit.log',
filemode='a'
)
class Device():
def __init__(self):
self.kdm_uuids = [generate_uuid() for i in range(1)]
self.cpl_uuids = [generate_uuid() for i in range(1)]
self.kdm_lock = RLock()
self.sync_status = False
def content_add_key(self):
try:
session = Session
# while 1:
logging.info('++Add')
self._new_kdm(session)
time.sleep(30)
with self.kdm_lock:
session.commit()
except Exception as err:
logging.info('++++content_add_key')
logging.info(err)
def device_sync_key_information(self):
session = Session
# while 1:
try:
with self.kdm_lock:
with Session.no_autoflush:
logging.info('++Sync')
self._new_kdm(session)
time.sleep(25)
session.commit()
except Exception as err:
logging.info('++++device_sync_key_information')
logging.info(err)
def keep_session(self):
session = Session()
while 1:
kdms = session.query(TestKdm).all()
print(len(kdms))
time.sleep(10)
def _new_kdm(self, session):
uuid = self.kdm_uuids[0]
username = generate_random_string(length=4, type_='word')
password = generate_random_string(length=8, type_='num')
cpl_uuid = self.cpl_uuids[0]
logging.info(cpl_uuid)
cpl_name = generate_random_string(length=3, type_='word')
cpl_rank = random.randint(1, 20)
kdm = TestKdm()
kdm.from_dict(dict(uuid=uuid,
username=username,
password=password,
cpl_uuid=cpl_uuid))
cpl = TestCpl()
cpl.from_dict(dict(uuid=cpl_uuid,
name=cpl_name,
rank=cpl_rank))
session.merge(cpl)
session.merge(kdm)
def start_one_device():
device = Device()
t1 = Thread(target=device.content_add_key)
t2 = Thread(target=device.device_sync_key_information)
t1.start()
time.sleep(10)
t2.start()
def main(device_num):
for i in range(device_num):
start_one_device()
if __name__ == '__main__':
import os
print(os.getpid())
main(1)
执行test_deadlock.py文件后
查看postgresql数据库日志,发现有两个事物开启(BEGIN)了之后没有完成的日志,并且有事务卡在获取ShareLock
查看pg_stat_activity,发现有个连接处于idle in transaction, 有个连接处于active状态,并且waiting为true表示正在等待一个锁
再查看pg_locks,可以看到事务7/22104已经获取了808585事务的ExclusiveLock(granted为True), 正在尝试获取808584的ShareLock(granted为False). 而6/36907这个事务已经获取了808584的ExclusiveLock,因此7/22104无法获取到808584的ShareLock. 关于各种锁的详细解释可以参考postgressql锁文档
贴一个Postgresql的中文文档pg_locks的列的解释
死锁原因
在content_add_key()
方法中, 由于没有像device_sync_key_information()
那样使用Session.no_autoflush, sqlalchemy会在query语句和merge的时候,自动调用flush(),将当前session中的修改内容提交到数据库. 所以在_new_kdm()
时,session.merge(kdm)
语句会将此前的session.merge(cpl)
语句提交到数据库,此时会获得一个行级锁A同时阻塞后续试图修改相同cpl的事务.而device_sync_key_information()
中也会调用_new_kdm()
,模拟代码中直接使用了相同的uuid,因此两个线程启动后会尝试更新同一个cpl. 而因为content_add_key()
在commit之前试图获取kdm_lock,此锁却被执行device_sync_key_information()
的线程持有,因此触发了死锁
注意事项:
- 在使用sqlalchemy的session时,注意auto_flush开启后的隐藏效果.
- 注意一个session操作的原子性,避免这种一个事务期间去获取另一个线程锁的行为. 本来无论是python自身的死锁还是数据库本身的死锁,都比较容易复现和排查.但是像这个例子中,两者结合的一个死锁,既无法在代码中观察到明显的死锁结构. 又不会触发postgresql的死锁检测机制.
#