(二十七)Flask之数据库连接池DBUtils库

本文主要讲解关于(二十七)Flask之数据库连接池DBUtils库相关内容,让我们来一起学习下吧!

目录:

  • 每篇前言:
  • DBUtils库
    • 模式一(底层使用threading.local实现):
    • 模式二:
  • Flask中使用
    • 方式一:直接将DBUtils初始化放到settings.py文件中
    • 方式二:从utils文件夹中导入
  • 脚本使用DBUtils代码demo:

每篇前言:

  • ??作者介绍:【孤寒者】—CSDN全栈领域优质创作者、HDZ核心组成员、华为云享专家Python全栈领域博主、CSDN原力计划作者

  • ??本文已收录于Flask框架从入门到实战专栏:《Flask框架从入门到实战》
  • ??热门专栏推荐:《Python全栈系列教程》、《爬虫从入门到精通系列教程》、《爬虫进阶+实战系列教程》、《Scrapy框架从入门到实战》、《Flask框架从入门到实战》、《Django框架从入门到实战》、《Tornado框架从入门到实战》、《前端系列教程》。
  • ?​?本专栏面向广大程序猿,为的是大家都做到Python全栈技术从入门到精通,穿插有很多实战优化点。
  • ??订阅专栏后可私聊进一千多人Python全栈交流群(手把手教学,问题解答); 进群可领取Python全栈教程视频 + 多得数不过来的计算机书籍:基础、Web、爬虫、数据分析、可视化、机器学习深度学习人工智能算法面试题等。
  • ??加入我一起学习进步,一个人可以走的很快,一群人才能走的更远!

(二十七)Flask之数据库连接池DBUtils库

引子:

在上一个demo项目中,登录部分验证是直接写死的,本文模拟实际生产,查询MySQL数据库做验证。

一个很low的方法是: 项目根目录下创建utils/sql.py:

import pymysql


class SQLHelper(object):

    @staticmethod
    def open():
        conn = pymysql.connect(host='127.0.0.1', port=3306, password='123456', db='UserInfo')
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        return conn, cursor

    @staticmethod
    def close(conn, cursor):
        conn.commit()
        cursor.close()
        conn.close()

    @classmethod
    def fetch_one(cls, sql, args):
        conn, cursor = cls.open()
        cursor.execute(sql, args)
        obj = cursor.fetchone()
        cls.close(conn, cursor)
        return obj

(二十七)Flask之数据库连接池DBUtils库

上面这种用法很严重且明显的一个问题是: 每次登录一次都要和数据库创建一个连接!

解决方法就是使用DBUtils三方库:

DBUtils库

pip install DBUtils==1.3

DBUtils 是一套用于管理数据库连接池的Python包,为高频度高并发的数据库访问提供更好的性能,可以自动管理连接对象的创建和释放。并允许对非线程安全的数据库接口进行线程安全包装。

这种连接池有两种连接模式:

  1. PersistentDB:提供线程专用的数据库连接,并自动管理连接。

    为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭!

  2. PooledDB:提供线程间可共享的数据库连接,并自动管理连接。

    创建一批连接到连接池,供所有线程共享使用。

    PS:由于pymysql的threadsafety值为1,而DBUtils库用的内部又是用的pymysql,所以该模式连接池中的线程会被所有线程共享。

实测证明 PersistentDB 的速度是最高的(即第一种模式),但是在某些特殊情况下,数据库的连接过程可能异常缓慢,而此时的PooledDB(即模式二,所以推荐这个)则可以提供相对来说平均连接时间比较短的管理方式。

模式一(底层使用threading.local实现):

  • 了解即可~~~
from DBUtils.PersistentDB import PersistentDB
import pymysql

POOL = PersistentDB(
    creator=pymysql,       # 连接数据库的模块
    maxusage=None,         # 一个数据库连接最多被重复使用的次数,None表示无限制
    setsession=[],         # 开始会话前执行的命令列表。如["set time zone ..."]
    ping=0,                # ping MySQL客户端,检查服务是否可用。  【一般用0,4,7】
    # 0 = None = never;  1 = default = whenever if is requested;  2 = when a cursor is created; 4 = when a query is executed;  7 = always
    closeable=False,       # False:conn.close()实际上被忽略,供下次使用,在线程关闭时,才会自动关闭连接;True:conn.close()则关闭连接,再次调用就是一个新的连接了
    threadlocal=None,      # 本线程独享值的对象,用于保存链接对象,如果链接对象被重置,也清除
    host='127.0.0.1',
    port=6379,
    user='root',
    password='123456',
    database='UserInfo',
    charset='utf8'
)

使用:

def demo():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from users')
    result = cursor.fetchall()
    cursor.close()  
    conn.close()
    

模式二:

  • 用这个~~~
from DBUtils.PooledDB import PooledDB
import pymysql

POOL = PooledDB(
    creator=pymysql,       # 连接数据库的模块
    maxconnections=6,      # 连接池允许的最大连接数, 0和None表示不限制
    mincached=2,           # 初始化时,连接池中至少创建的空闲的连接,0表示不创建
    maxcached=5,           # 连接池中最多闲置的连接,0和None不限制
    maxshared=3,           # 连接池中最多共享的连接数量,0和None表示全部共享【默认为0,而且哪怕设置别的值也无用!!!下面会将为啥】
    blocking=True,         # 连接池中如果没有可用连接后,是否阻塞等待。True:等待;False:不等待直接报错
    maxusage=None,         # 一个连接最多被重复使用的次数,None表示无限制
    setsession=[],         # 开始会话前执行的命令列表。如["set time zone ..."]
    ping=0,                # ping MySQL客户端,检查服务是否可用。  【一般用0,4,7】
    # 0 = None = never;  1 = default = whenever if is requested;  2 = when a cursor is created; 4 = when a query is executed;  7 = always

    host='127.0.0.1',
    port=6379,
    user='root',
    password='123456',
    database='UserInfo',
    charset='utf8'
)


  • 为什么设置maxshared参数的值是无用的!因为DBUtils使用的pymysql,而pymysql内部threadsafety值为1。看源码:

    import pymysql
    

    (二十七)Flask之数据库连接池DBUtils库

    DBUtils源码:

    from DBUtils.PooledDB import PooledDB
    

    (二十七)Flask之数据库连接池DBUtils库

使用:

def demo():
	"""
	检测当前正在运行连接数是否小于最大连接数;如果不小于则等待或报错:raise TooManyConnection异常
	否则则优先去初始化时创建的连接中获取连接:SteadyDBConnection
	然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
	如果最开始创建的连接没有连接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
	
	一旦关闭连接后,连接就返回到连接池让后续线程继续使用~
	"""
    conn = POOL.connection()
    
    cursor = conn.cursor()
    cursor.execute('select * from users')
    result = cursor.fetchall()
    conn.close()

Flask中使用

方式一:直接将DBUtils初始化放到settings.py文件中

(二十七)Flask之数据库连接池DBUtils库(二十七)Flask之数据库连接池DBUtils库

方式二:从utils文件夹中导入

(二十七)Flask之数据库连接池DBUtils库(二十七)Flask之数据库连接池DBUtils库

脚本使用DBUtils代码demo:

# coding=utf-8
"""
使用DBUtils数据库连接池中的连接,操作数据库
"""
import datetime
import pymysql
from DBUtils.PooledDB import PooledDB


class MysqlClient(object):
    def __init__(self, **kwargs):
        self.pool = self.create_pool(**kwargs)
        self.connection = None
        self.cursor = None

    def create_pool(self, **kwargs):
        return PooledDB(
            pymysql,
            mincached=kwargs.get('mincached', 10),
            maxcached=kwargs.get('maxcached', 20),
            maxshared=kwargs.get('maxshared', 10),
            maxconnections=kwargs.get('maxconnections', 200),
            blocking=kwargs.get('blocking', True),
            maxusage=kwargs.get('maxusage', 100),
            setsession=kwargs.get('setsession', None),
            reset=kwargs.get('reset', True),
            host=kwargs.get('host', '127.0.0.1'),
            port=kwargs.get('port', 3306),
            db=kwargs.get('db', 'mysqldemo'),
            user=kwargs.get('user', 'root'),
            passwd=kwargs.get('passwd', '123456'),
            charset=kwargs.get('charset', 'utf8mb4'),
            cursorclass=pymysql.cursors.DictCursor
        )

    def get_conn_cursor(self):
        self.connection = self.pool.connection()
        self.cursor = self.connection.cursor()

    def close(self):
        try:
            if self.cursor:
                self.cursor.close()
            if self.connection:
                self.connection.close()
        except Exception as e:
            print(e)

    def execute(self, sql, param=()):
        try:
            self.get_conn_cursor()
            count = self.cursor.execute(sql, param)
            print(count)
            return count
        finally:
            self.close()

    def __dict_datetime_obj_to_str(self, result_dict):
        """把字典里面的datetime对象转成字符串,使json转换不出错"""
        if result_dict:
            return {k: v.__str__() if isinstance(v, datetime.datetime) else v for k, v in result_dict.items()}
        return result_dict

    def select_one(self, sql, param=()):
        """查询单个结果"""
        try:
            self.get_conn_cursor()
            count = self.execute(sql, param)
            result = self.cursor.fetchone()
            result = self.__dict_datetime_obj_to_str(result)
            return count, result
        finally:
            self.close()

    def select_many(self, sql, param=()):
        """查询多个结果"""
        try:
            self.get_conn_cursor()
            count = self.execute(sql, param)
            result = self.cursor.fetchall()
            result = [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
            return count, result
        finally:
            self.close()

    def begin(self):
        """开启事务"""
        try:
            self.get_conn_cursor()
            self.connection.autocommit(False)
        except Exception as e:
            print(e)

    def end(self, option='commit'):
        """结束事务"""
        try:
            if option == 'commit':
                self.connection.commit()
            else:
                self.connection.rollback()
        except Exception as e:
            print(e)
        finally:
            self.connection.autocommit(True)


if __name__ == "__main__":
    mc = MysqlClient()
    sql1 = 'SELECT * FROM customers  WHERE  customerNumber = 103'
    result1 = mc.select_one(sql1)
    print(result1[1])

    sql2 = 'SELECT * FROM customers  WHERE  customerNumber IN (%s,%s,%s)'
    param = (103, 144, 145)
    print(mc.select_many(sql2, param)[1])

以上就是关于(二十七)Flask之数据库连接池DBUtils库相关的全部内容,希望对你有帮助。欢迎持续关注程序员导航网,学习愉快哦!

版权声明:csdnhot 发表于 2024-04-13 19:25:45。
转载请注明:(二十七)Flask之数据库连接池DBUtils库 | 程序员导航网

暂无评论

您必须登录才能参与评论!
立即登录
暂无评论...