本文主要讲解关于(二十七)Flask之数据库连接池DBUtils库相关内容,让我们来一起学习下吧!
目录:
- 每篇前言:
- DBUtils库
-
- 模式一(底层使用threading.local实现):
- 模式二:
- Flask中使用
-
- 方式一:直接将DBUtils初始化放到settings.py文件中
- 方式二:从utils文件夹中导入
- 脚本使用DBUtils代码demo:
每篇前言:
??作者介绍:【孤寒者】—CSDN全栈领域优质创作者、HDZ核心组成员、华为云享专家Python全栈领域博主、CSDN原力计划作者
- ??本文已收录于Flask框架从入门到实战专栏:《Flask框架从入门到实战》
- ??热门专栏推荐:《Python全栈系列教程》、《爬虫从入门到精通系列教程》、《爬虫进阶+实战系列教程》、《Scrapy框架从入门到实战》、《Flask框架从入门到实战》、《Django框架从入门到实战》、《Tornado框架从入门到实战》、《前端系列教程》。
- ??本专栏面向广大程序猿,为的是大家都做到Python全栈技术从入门到精通,穿插有很多实战优化点。
- ??订阅专栏后可私聊进一千多人Python全栈交流群(手把手教学,问题解答); 进群可领取Python全栈教程视频 + 多得数不过来的计算机书籍:基础、Web、爬虫、数据分析、可视化、机器学习、深度学习、人工智能、算法、面试题等。
- ??加入我一起学习进步,一个人可以走的很快,一群人才能走的更远!
引子:
在上一个demo项目中,登录部分验证是直接写死的,本文模拟实际生产,查询MySQL数据库做验证。
一个很low的方法是: 项目根目录下创建utils/sql.py:
import pymysql
class SQLHelper(object):
@staticmethod
def open():
conn = pymysql.connect(host='127.0.0.1', port=3306, password='123456', db='UserInfo')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn, cursor
@staticmethod
def close(conn, cursor):
conn.commit()
cursor.close()
conn.close()
@classmethod
def fetch_one(cls, sql, args):
conn, cursor = cls.open()
cursor.execute(sql, args)
obj = cursor.fetchone()
cls.close(conn, cursor)
return obj
上面这种用法很严重且明显的一个问题是: 每次登录一次都要和数据库创建一个连接!
解决方法就是使用DBUtils三方库:
DBUtils库
pip install DBUtils==1.3
DBUtils 是一套用于管理数据库连接池的Python包,为高频度高并发的数据库访问提供更好的性能,可以自动管理连接对象的创建和释放。并允许对非线程安全的数据库接口进行线程安全包装。
这种连接池有两种连接模式:
-
PersistentDB:提供线程专用的数据库连接,并自动管理连接。
为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭!
-
PooledDB:提供线程间可共享的数据库连接,并自动管理连接。
创建一批连接到连接池,供所有线程共享使用。
PS:由于pymysql的threadsafety值为1,而DBUtils库用的内部又是用的pymysql,所以该模式连接池中的线程会被所有线程共享。
实测证明 PersistentDB 的速度是最高的(即第一种模式),但是在某些特殊情况下,数据库的连接过程可能异常缓慢,而此时的PooledDB(即模式二,所以推荐这个)则可以提供相对来说平均连接时间比较短的管理方式。
模式一(底层使用threading.local实现):
- 了解即可~~~
from DBUtils.PersistentDB import PersistentDB
import pymysql
POOL = PersistentDB(
creator=pymysql, # 连接数据库的模块
maxusage=None, # 一个数据库连接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如["set time zone ..."]
ping=0, # ping MySQL客户端,检查服务是否可用。 【一般用0,4,7】
# 0 = None = never; 1 = default = whenever if is requested; 2 = when a cursor is created; 4 = when a query is executed; 7 = always
closeable=False, # False:conn.close()实际上被忽略,供下次使用,在线程关闭时,才会自动关闭连接;True:conn.close()则关闭连接,再次调用就是一个新的连接了
threadlocal=None, # 本线程独享值的对象,用于保存链接对象,如果链接对象被重置,也清除
host='127.0.0.1',
port=6379,
user='root',
password='123456',
database='UserInfo',
charset='utf8'
)
使用:
def demo():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from users')
result = cursor.fetchall()
cursor.close()
conn.close()
模式二:
- 用这个~~~
from DBUtils.PooledDB import PooledDB
import pymysql
POOL = PooledDB(
creator=pymysql, # 连接数据库的模块
maxconnections=6, # 连接池允许的最大连接数, 0和None表示不限制
mincached=2, # 初始化时,连接池中至少创建的空闲的连接,0表示不创建
maxcached=5, # 连接池中最多闲置的连接,0和None不限制
maxshared=3, # 连接池中最多共享的连接数量,0和None表示全部共享【默认为0,而且哪怕设置别的值也无用!!!下面会将为啥】
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True:等待;False:不等待直接报错
maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如["set time zone ..."]
ping=0, # ping MySQL客户端,检查服务是否可用。 【一般用0,4,7】
# 0 = None = never; 1 = default = whenever if is requested; 2 = when a cursor is created; 4 = when a query is executed; 7 = always
host='127.0.0.1',
port=6379,
user='root',
password='123456',
database='UserInfo',
charset='utf8'
)
-
为什么设置maxshared参数的值是无用的!因为DBUtils使用的pymysql,而pymysql内部threadsafety值为1。看源码:
import pymysql
DBUtils源码:
from DBUtils.PooledDB import PooledDB
使用:
def demo():
"""
检测当前正在运行连接数是否小于最大连接数;如果不小于则等待或报错:raise TooManyConnection异常
否则则优先去初始化时创建的连接中获取连接:SteadyDBConnection
然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
如果最开始创建的连接没有连接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
一旦关闭连接后,连接就返回到连接池让后续线程继续使用~
"""
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute('select * from users')
result = cursor.fetchall()
conn.close()
Flask中使用
方式一:直接将DBUtils初始化放到settings.py文件中
方式二:从utils文件夹中导入
脚本使用DBUtils代码demo:
# coding=utf-8
"""
使用DBUtils数据库连接池中的连接,操作数据库
"""
import datetime
import pymysql
from DBUtils.PooledDB import PooledDB
class MysqlClient(object):
def __init__(self, **kwargs):
self.pool = self.create_pool(**kwargs)
self.connection = None
self.cursor = None
def create_pool(self, **kwargs):
return PooledDB(
pymysql,
mincached=kwargs.get('mincached', 10),
maxcached=kwargs.get('maxcached', 20),
maxshared=kwargs.get('maxshared', 10),
maxconnections=kwargs.get('maxconnections', 200),
blocking=kwargs.get('blocking', True),
maxusage=kwargs.get('maxusage', 100),
setsession=kwargs.get('setsession', None),
reset=kwargs.get('reset', True),
host=kwargs.get('host', '127.0.0.1'),
port=kwargs.get('port', 3306),
db=kwargs.get('db', 'mysqldemo'),
user=kwargs.get('user', 'root'),
passwd=kwargs.get('passwd', '123456'),
charset=kwargs.get('charset', 'utf8mb4'),
cursorclass=pymysql.cursors.DictCursor
)
def get_conn_cursor(self):
self.connection = self.pool.connection()
self.cursor = self.connection.cursor()
def close(self):
try:
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
except Exception as e:
print(e)
def execute(self, sql, param=()):
try:
self.get_conn_cursor()
count = self.cursor.execute(sql, param)
print(count)
return count
finally:
self.close()
def __dict_datetime_obj_to_str(self, result_dict):
"""把字典里面的datetime对象转成字符串,使json转换不出错"""
if result_dict:
return {k: v.__str__() if isinstance(v, datetime.datetime) else v for k, v in result_dict.items()}
return result_dict
def select_one(self, sql, param=()):
"""查询单个结果"""
try:
self.get_conn_cursor()
count = self.execute(sql, param)
result = self.cursor.fetchone()
result = self.__dict_datetime_obj_to_str(result)
return count, result
finally:
self.close()
def select_many(self, sql, param=()):
"""查询多个结果"""
try:
self.get_conn_cursor()
count = self.execute(sql, param)
result = self.cursor.fetchall()
result = [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
return count, result
finally:
self.close()
def begin(self):
"""开启事务"""
try:
self.get_conn_cursor()
self.connection.autocommit(False)
except Exception as e:
print(e)
def end(self, option='commit'):
"""结束事务"""
try:
if option == 'commit':
self.connection.commit()
else:
self.connection.rollback()
except Exception as e:
print(e)
finally:
self.connection.autocommit(True)
if __name__ == "__main__":
mc = MysqlClient()
sql1 = 'SELECT * FROM customers WHERE customerNumber = 103'
result1 = mc.select_one(sql1)
print(result1[1])
sql2 = 'SELECT * FROM customers WHERE customerNumber IN (%s,%s,%s)'
param = (103, 144, 145)
print(mc.select_many(sql2, param)[1])
以上就是关于(二十七)Flask之数据库连接池DBUtils库相关的全部内容,希望对你有帮助。欢迎持续关注程序员导航网,学习愉快哦!