数据库
MySQL
安装驱动
pip install mysql-connector-python --allow-external mysql-connector-python
# 如果上面的命令安装失败,可以试试另一个驱动
pip install mysql-connector
数据库操作
import mysql.connector
import random
# 创建数据库连接
conn = mysql.connector.connect(user='root', password='12345678', database='test')
# 新增
cursor = conn.cursor()
cursor.execute("insert into python_test (name) values (%s)", ['test' + str(random.randint(1, 100))])
# 获取新增的ID
lastInsertId1 = cursor.lastrowid
cursor.execute("insert into python_test (name) values (%s)", ['test' + str(random.randint(1, 100))])
# 获取新增的ID
lastInsertId2 = cursor.lastrowid
# 提交新增
conn.commit()
# 关闭游标
cursor.close()
# 修改
cursor = conn.cursor()
cursor.execute("update python_test set name=%s where id = %s",['new_test' + str(random.randint(1,100)),lastInsertId2])
conn.commit()
cursor.close()
# 删除
cursor = conn.cursor()
cursor.execute("delete from python_test where id = %s",[lastInsertId1])
conn.commit()
cursor.close()
# 查询
cursor = conn.cursor()
cursor.execute("select * from python_test")
res = cursor.fetchall()
print(res)
一个简单的数据库封装
import mysql.connector
import config
class Db(object):
database = 'default'
conn = None
is_transaction = False
def __init__(self, database='default'):
self.database = database
def begin_transaction(self):
self.get_conn()
self.is_transaction = True
def get_conn(self):
if self.is_transaction is False:
if self.database == 'default':
self.conn = mysql.connector.connect(user=config.DB_USER, password=config.DB_PASSWORD, database=config.DB_DATABASE,
host=config.DB_HOST, port=config.DB_PORT, connection_timeout=10)
return self.conn
else:
raise BaseException('获取数据库连接失败')
else:
if self.conn is None:
raise BaseException('数据库连接丢失')
return self.conn
def insert(self, db_name, data_dict):
db_field = data_dict.keys()
db_field = "(`" + "`,`".join(db_field) + "`)"
data_values = "(" + "%s," * (len(data_dict)) + ")"
data_values = data_values.replace(',)', ')')
conn = self.get_conn()
cursor = conn.cursor()
sql = """ INSERT INTO %s %s VALUES %s """ % (db_name, db_field, data_values)
cursor.execute(sql, tuple(data_dict.values()))
res = cursor.lastrowid
if self.is_transaction is False:
conn.commit()
cursor.close()
if self.is_transaction is False:
self.close(conn)
return res
def batch_insert(self, db_name, data_list):
db_field = data_list[0].keys()
db_field = "(`" + "`,`".join(db_field) + "`)"
length = len(data_list[0])
temp = "(" + "%s," * length + ")"
temp = temp.replace(',)', '),')
data_values = temp * len(data_list)
data_values = data_values.rstrip(",")
params = []
for data_dict in data_list:
for key in data_dict:
params.append(data_dict[key])
conn = self.get_conn()
cursor = conn.cursor()
sql = """ INSERT INTO %s %s VALUES %s """ % (db_name, db_field, data_values)
cursor.execute(sql, tuple(params))
if self.is_transaction is False:
conn.commit()
cursor.close()
if self.is_transaction is False:
self.close(conn)
return True
def update(self, db_name, data_dict, where):
pass
def execute(self, sql):
conn = self.get_conn()
cursor = conn.cursor()
if type(sql) is tuple:
cursor.execute(sql[0], sql[1])
else:
cursor.execute(sql)
if self.is_transaction is False:
conn.commit()
cursor.close()
if self.is_transaction is False:
self.close(conn)
return True
def select(self, sql):
conn = self.get_conn()
cursor = conn.cursor(dictionary=True)
if type(sql) is tuple:
cursor.execute(sql[0], sql[1])
else:
cursor.execute(sql)
res = cursor.fetchall()
cursor.close()
if self.is_transaction is False:
self.close(conn)
return res
def select_one(self, sql):
res = self.select(sql)
if res[0]:
return res[0]
return {}
def commit(self):
self.is_transaction = False
self.conn.commit()
self.close()
def rollback(self):
self.is_transaction = False
self.conn.rollback()
self.close()
def close(self, conn=None):
if conn is None or conn is self.conn:
self.conn.close()
self.conn = None
else:
conn.close()
def example():
db_object = Db()
# 事务
db_object.begin_transaction()
db_object.insert("table_name", {"field1": "value1", 'field2': "value2"})
db_object.insert("table_name", {"field1": "value11", 'field2': "value22"})
db_object.commit()
# 新增
db_object.insert("table_name", {"field1": "value111", 'finished_time': "value222"})
# 修改,SQL
db_object.execute(
("update table_name set field1=%s,field2=%s where id = %s", ["value_update1", "value_update2", 1]))
# 查询,SQL
db_object.select(("select * from table_name where id in (%s,%s)", [1, 2]))
# 批量新增
db_object.batch_insert("table_name", [{"field1": "aaa", "field2": "bbb"}, {"field1": "aaaa", "field2": "bbbb"}])
return True