脚本专栏 
首页 > 脚本专栏 > 浏览文章

详解使用Python写一个向数据库填充数据的小工具(推荐)

(编辑:jimmy 日期: 2025/1/13 浏览:3 次 )

一. 背景

公司又要做一个新项目,是一个合作型项目,我们公司出web展示服务,合作伙伴线下提供展示数据。

而且本次项目是数据统计展示为主要功能,并没有研发对应的数据接入接口,所有展示数据源均来自数据库查询,

所以验证数据没有别的入口,只能通过在数据库写入数据来进行验证。

二. 工具

Python+mysql

三.前期准备

前置:当然是要先准备好测试方案和测试用例,在准备好这些后才能目标明确将要开发自动化小工具都要有哪些功能,避免走弯路

3.1 跟开发沟通

1)确认数据库连接方式,库名 ;

2)测试所涉及到的表名;

3)每个表是对哪部分业务造成影响;

4)表之间的关联关系,同业务模块之间的不同表之间是否有关联字段;

5)表中个字段数据是否有特殊来源,如用户ID、厂商名称一类与业务有一定关联程度的字段,一定要确认好是可以随机生成的 还是 需要从指定表中获取字段

3.2 设计小工具最终要实现的效果

我设计这个小工具就希望它可以完成两件事情

1) 生成sql语句,并且能写入进入mysql数据库

2) 数据我写入sql的统计结果,方便使用它直接与页面结果进行比对

3.3 给自己加个油! 我一定可以的!

四.开始编程

***因为代码内实际业务逻辑比较强,所以下面只写出一些示例啦~

4.1 先处理一下基础数据

1)因为我将要写入的内容涉及到全国的省市名称和省市行政区域代码,所以先找开发要了一个他们使用的全国省市名称及代码对照json,然后处理成字典格式备用

2)编写写入sql的语句头

通过配置文件的方式将各个表的写入语句的前半句语法格式和写入字段名称编辑好。

之后每次使用的时候只需通过配置文件读取对应表的写入语句再拼装上要写入的内容就是一个完整的sql语句了

使用配置文件的模式主要是可以统一管理,并且可以保持后面生成环节的代码整洁

[veh_scrap]
veh = INSERT INTO zqy_veh_scrap_sto (zqy_id, vin, pack_num, scrap_time, bat_is_scrap,wmi,epname,epcode,province_code,city_code,submit_time) values

4.2 编写工具模块

1)编写随机工具

因为写入的内容大部分字段需要不重复,所以使用random模块,定义不同的方法生成各种类型随机数据,如随机身份证号 随机姓名 随机编码 随机ID 等等...

再生成写入数据的时候,就可以引用这个随机工具模块引用里面的方法取随机值,以保证写入数据的唯一性

例:

def random_vin():
 """
 生成随机vin
 :return:
 """
 return ''.join(
  random.sample(['Z', 'Y', 'X', 'W', 'V', 'U', 'T', 'S', 'R', 'Q', 'P',
      'O', 'N', 'M', 'L', 'K', 'J', 'I', 'H', 'G', 'F', 'E',
      'D', 'C', 'B', 'A', '1', '2', '3', '4', '5', '6', '7',
      '8', '9', '0'], 17))

2)pyMysql模块引入并封装可用工具

因为最终要将内容写入,所以要使用pyMysql模块,将组装好的sql语句执行写入

需要封装的内容有 数据库链接 获取游标 语句执行方法 数据库查询方法 删除语句方法

例:

def get_conn():
 conn = pymysql.connect(
  host=,
  port=,
  user=,
  password=,
  database=,
  charset='utf8')
 return conn


def execut_sql(sql):
 try:

  conn = get_conn()
  cur = conn.cursor()
  cur.execute(sql)
  conn.commit()
  print("数据库执行成功")
  cur.close()

 except Exception as e:
  print(str(e))
  print(sql)
  # 有异常就回滚
  conn.rollback()
  # 关闭连接
 cur.close()
 conn.close()

def select_one_sql(sql):
 try:
  conn = get_conn()
  cur = conn.cursor()
  cur.execute(sql)
  results = cur.fetchone()
  #print(results)
  cur.close()

  return results
 except Exception as e:
  print(str(e))
  # 关闭连接
 cur.close()
 conn.close()

3)读取配置的模块

因为我们将sql语句的头通过配置文件进行管理,那么就需要一个读取配置的模块或方法,因为我比较菜 所以为了看起来更加清晰 就用模块来进行管理了

import configparser

cfg_path = 'rebulid_generate_sql_git\base_data\sql_header.ini'
cfg = configparser.ConfigParser()
cfg.read(cfg_path,encoding='utf-8')

def get_config_data(section,options):
 return cfg.get(section,options)

4) 编写生成写入信息的方法

因为我们在处理基础数据的时候就已经写好了sql写入语句头了,所以现在只要生成它后面的values值就可以了,这步就是生成他values值的步骤

创建一个列表,根据写入语句内字段的顺序生成对应字段的值并添加至创建的列表中。对应字段的内容如果是随机值,就使用之前准备好的随机模块,使用里面的方法生成随机值;如果是需要从其他表中获取的值,则通过封装好的qymysql的查找方法进行搜索并引用。

例:

def recycle_veh_scrap(prov, city):
 """
 报废车辆入库表
 :return:
 """
 prov_info = random_util.random_city_code(prov, city)
 # 获取当地所有公司信息
 company = search_factory_info.get_company_data(prov, city)
 # 获取vin
 vin = random_util.random_vin()
 # 生成数据容器
 insert_data = []
 # 生成随机zqy_id,写入列表
 insert_data.append(random_util.random_id())
 # 生成随机vin,写入列表
 insert_data.append(vin)
 # 生成随机电池包数,写入列表
 insert_data.append(random.randint(1, 10))
 # 生成报废时间,写入列表
 insert_data.append(random_util.random_date())
 # bat_is_scrap
 insert_data.append('1')
 # wmi
 insert_data.append(vin[0:3].upper())
 # 生成报废企业名称,写入列表
 insert_data.append(company[0])
 # 生成报废企业代码,写入列表
 insert_data.append(company[1])
 # 获取省级代码,写入列表
 insert_data.append(prov_info[0])
 # 获取市级代码,写入列表
 insert_data.append(prov_info[1])
 # 数据提交时间为当前数据生成时间,写入列表
 insert_data.append(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))return insert_data

5) 编写统计方法

因为我们写入的值是随机的,所以没有标准去判断最终系统写入的内容是否正确。

那就需要我们将需要对比的值取出保存并统计,用于最后对照使用。

取值的过程会在语句写入之前一步完成,现在要先准备一个数据处理的模块,以便对取出的至进行统计。

可以先准备几个比较常用的,例如列表内同类值 列表内数值求和 字典合并等方法,如果后面再有更复杂的格式,再单独编写。

例:

def TongJiQi(list):
 # 列表内容统计器,对列表内的重复项进行数量统计
 count_dict = dict()
 for item in list:
  if item in count_dict:
   count_dict[item] += 1
  else:
   count_dict[item] = 1
 return count_dict


def qiuhe(data_list):
 """
 对列表内的数值进行求和
 :param data_list:
 :return:
 """
 total = 0
 for ele in range(0, len(data_list)):
  total = total + data_list[ele]
 return total

以上,准备工作就都做好了,下面就要开始真正的写入了

4.3 生成数据并拼装为sql语句

1) 通过生产sql语句数量来控制循环,将sql头与值拼装在一起,并将拼装好的结果进行写入

2) 在循环生成写入值的过程中,将需要统计或计算的值取出单独保存,在写入结束后再进行技术统计输出统计结果

def write_sql(prov, city, des_prov, des_city)
# 通过配置文件获取sql头
sql = config_util.get_config_data('veh_scrap', 'pack_out')
# 控制循环
i = 0
# 创建列表,用于收集需要统计计算的数据
company_num = []
des_company_num = []
while i < sql_num:
 i += 1
 # 取生成的values值
 data = _crap_out(prov, city, des_prov, des_city)
 # 收集需要统计的数据
 company_num.append(data[8])
 des_company_num.append(data[4])
 # 拼装sql语句
 sql = sql + str(tuple(data)) + ','
 # 最后一组数据后的,删除掉
right_sql = sql.strip(',')
# 拼装上结尾的; fullsql作为返回值
full_sql = right_sql + ';'
# print(full_sql)
# 执行写入
pymysql_util.execut_sql(full_sql)
print('上报企业统计: ' + str(
 statistics_util.TongJiQi(company_num)))
print('去向企业统计: ' + str(
 statistics_util.TongJiQi(des_company_num)))

这样一个表的数据就写入成功了,只需要调整写入条数就可以想写入多少条就写入多少条,还可以直接输出你关心的字段统计结果。

多个数据库表的话可以就是将以上的生产数据和写入数据的步骤复制,按照表名和字段稍作修改就可以了。

再将所有表的数据生成和写入都编写完成后,可以编写一个小工具的入口,给自己编写一个选择器

每次只要输入对应的数字就可以执行对应的方法,写入数据并输出统计结果啦

def main(prov, city, sql_num, sour_prov, sour_city):
 """
 启动数据自动写入的主方法
 :return:
 """
 features_type = input("请选择生成数据所属功能模块 1.车辆报废 2.梯次利用 3.资源再生")
 if features_type == '1':
  scrapped_main.write_sql(prov, city, sql_num, des_prov=sour_prov,
        des_city=sour_city)
 elif features_type == '2':
  echelon_use_main.write_main(prov, city, sql_num, sour_prov, sour_city)
 elif features_type == '3':
  recycle_main.write_main(sql_num, prov, city, sour_prov, sour_city)

五.总结

以上只是一个工作过程中仓促搞出的一个小东西,还有很多不足。抛转引玉,拿出来跟大家分享一下我的思路,希望能对大家有所帮助!

还有我趟过的坑,也记录一下吧

1.不要只管闷头写,写完一个表的时候就执行试一下,不只要看数据库写进去了 主要是看看要测试的平台能不能看到,之前因为开发忘了跟我说一个字段条件,我全都写完了也执行成功了,但就是上不去平台,最后不得不大改一遍

2.随时跟开发沟通确认任何一个不确定的问题,因为有的表可能会用不到,表内的某些字段没有用,表设计不明确的时候等等,这些时候都要跟他们确认好再继续动手写,防止无用功。

3.这只是辅助测试的工具,要控制住开发工具的时间,防止测试工作的延误。

上一篇:Python Pivot table透视表使用方法解析
下一篇:Python extract及contains方法代码实例