数据库

MySQL

安装驱动


pip install mysql-connector-python --allow-external mysql-connector-python

# 如果上面的命令安装失败,可以试试另一个驱动
pip install mysql-connector

数据库操作

import mysql.connector
import random

# 创建数据库连接
conn = mysql.connector.connect(user='root', password='12345678', database='test')

# 新增
cursor = conn.cursor()
cursor.execute("insert into python_test (name) values (%s)", ['test' + str(random.randint(1, 100))])
# 获取新增的ID
lastInsertId1 = cursor.lastrowid
cursor.execute("insert into python_test (name) values (%s)", ['test' + str(random.randint(1, 100))])
# 获取新增的ID
lastInsertId2 = cursor.lastrowid
# 提交新增
conn.commit()
# 关闭游标
cursor.close()

# 修改
cursor = conn.cursor()
cursor.execute("update python_test set name=%s where id = %s",['new_test' + str(random.randint(1,100)),lastInsertId2])
conn.commit()
cursor.close()

# 删除
cursor = conn.cursor()
cursor.execute("delete from python_test where id = %s",[lastInsertId1])
conn.commit()
cursor.close()

# 查询
cursor = conn.cursor()
cursor.execute("select * from python_test")
res = cursor.fetchall()
print(res)

一个简单的数据库封装

import mysql.connector
import config


class Db(object):
    database = 'default'
    conn = None
    is_transaction = False

    def __init__(self, database='default'):
        self.database = database

    def begin_transaction(self):
        self.get_conn()
        self.is_transaction = True

    def get_conn(self):
        if self.is_transaction is False:
            if self.database == 'default':
                self.conn = mysql.connector.connect(user=config.DB_USER, password=config.DB_PASSWORD, database=config.DB_DATABASE,
                                                    host=config.DB_HOST, port=config.DB_PORT, connection_timeout=10)
                return self.conn
            else:
                raise BaseException('获取数据库连接失败')
        else:
            if self.conn is None:
                raise BaseException('数据库连接丢失')
            return self.conn

    def insert(self, db_name, data_dict):
        db_field = data_dict.keys()
        db_field = "(`" + "`,`".join(db_field) + "`)"
        data_values = "(" + "%s," * (len(data_dict)) + ")"
        data_values = data_values.replace(',)', ')')

        conn = self.get_conn()
        cursor = conn.cursor()
        sql = """ INSERT INTO %s %s VALUES %s """ % (db_name, db_field, data_values)
        cursor.execute(sql, tuple(data_dict.values()))
        res = cursor.lastrowid
        if self.is_transaction is False:
            conn.commit()
        cursor.close()
        if self.is_transaction is False:
            self.close(conn)
        return res

    def batch_insert(self, db_name, data_list):
        db_field = data_list[0].keys()
        db_field = "(`" + "`,`".join(db_field) + "`)"

        length = len(data_list[0])
        temp = "(" + "%s," * length + ")"
        temp = temp.replace(',)', '),')
        data_values = temp * len(data_list)
        data_values = data_values.rstrip(",")

        params = []
        for data_dict in data_list:
            for key in data_dict:
                params.append(data_dict[key])

        conn = self.get_conn()
        cursor = conn.cursor()
        sql = """ INSERT INTO %s %s VALUES %s """ % (db_name, db_field, data_values)
        cursor.execute(sql, tuple(params))
        if self.is_transaction is False:
            conn.commit()
        cursor.close()
        if self.is_transaction is False:
            self.close(conn)
        return True

    def update(self, db_name, data_dict, where):
        pass

    def execute(self, sql):
        conn = self.get_conn()
        cursor = conn.cursor()
        if type(sql) is tuple:
            cursor.execute(sql[0], sql[1])
        else:
            cursor.execute(sql)
        if self.is_transaction is False:
            conn.commit()
        cursor.close()
        if self.is_transaction is False:
            self.close(conn)
        return True

    def select(self, sql):
        conn = self.get_conn()
        cursor = conn.cursor(dictionary=True)
        if type(sql) is tuple:
            cursor.execute(sql[0], sql[1])
        else:
            cursor.execute(sql)
        res = cursor.fetchall()
        cursor.close()
        if self.is_transaction is False:
            self.close(conn)
        return res

    def select_one(self, sql):
        res = self.select(sql)
        if res[0]:
            return res[0]
        return {}

    def commit(self):
        self.is_transaction = False
        self.conn.commit()
        self.close()

    def rollback(self):
        self.is_transaction = False
        self.conn.rollback()
        self.close()

    def close(self, conn=None):
        if conn is None or conn is self.conn:
            self.conn.close()
            self.conn = None
        else:
            conn.close()


def example():
    db_object = Db()

    # 事务
    db_object.begin_transaction()
    db_object.insert("table_name", {"field1": "value1", 'field2': "value2"})
    db_object.insert("table_name", {"field1": "value11", 'field2': "value22"})
    db_object.commit()

    # 新增
    db_object.insert("table_name", {"field1": "value111", 'finished_time': "value222"})

    # 修改,SQL
    db_object.execute(
        ("update table_name set field1=%s,field2=%s where id = %s", ["value_update1", "value_update2", 1]))

    # 查询,SQL
    db_object.select(("select * from table_name where id in (%s,%s)", [1, 2]))

    # 批量新增
    db_object.batch_insert("table_name", [{"field1": "aaa", "field2": "bbb"}, {"field1": "aaaa", "field2": "bbbb"}])

    return True

results matching ""

    No results matching ""