1.DBUtil.py
# -*- coding: utf-8 -*-import webDB_NAME = 'db_name'DB_USERNAME = 'famiover'DB_PASSWORD = 'famiover'DB_HOST = 'localhost'DB_PORT = 3306class DBUtil(object): __conn = None def __init__(self): self.__conn = web.database(dbn="mysql", db=DB_NAME, user=DB_USERNAME, pw=DB_PASSWORD, host=DB_HOST, port=DB_PORT, charset='utf8') def select(self, *table, **params): return self.__conn.select(*table, **params) def query(self, sql, vars=None): return self.__conn.query(sql, vars) def insert(self, *table, **params): return self.__conn.insert(*table, **params) def delete(self, *table, **params): return self.__conn.delete(*table, **params) def update(self, *table, **params): return self.__conn.update(*table, **params) def transaction(self): return self.__conn.transaction()
2.sql_util.py
# -*- coding: utf-8 -*-import webfrom famiover.exception.MyException import MyExceptionfrom famiover.model.response import responsefrom famiover.util.db_util import DBUtilfrom famiover.util.object_util import is_dict, is_empty, is_dict_not_emptydb = DBUtil()def insert_any_table(params): """插入数据到任何表""" result = opt_validate(params) if not result.isOk: print(result.message) else: tablename = get_table_name(params) del params if is_dict_not_empty(params): db.insert(tablename, **params)def delete_any_table(params): """删除数据从任何表""" result = opt_validate(params) if not result.isOk: print(result.message) else: tablename = get_table_name(params) params = delete_table_name(params) where = web.db.sqlwhere(params) db.delete(tablename, where=where)def select_any_table(params): """简单查询数据从任何表""" result = opt_validate(params) if not result.isOk: print(result.message) exit(0) else: tablename = get_table_name(params) params = delete_table_name(params) where = web.db.sqlwhere(params) return db.select(tablename, where=where)def update_any_table(params): """更新数据在任何表""" result = opt_validate(params) if not result.isOk: print(result.message) else: tablename = get_table_name(params) params = delete_table_name(params) where = web.db.sqlwhere(params) db.update(tablename, where=where)def query_any_table(sql): """复杂查询从任何表""" return db.query(sql)def get_table_name(params): """从字典中获取表名""" if is_dict(params) and params.get("tablename"): return params["tablename"] else: raise MyException(MyException.NoTableName)def delete_table_name(params): """从字典中删除表名""" del params["tablename"] if is_dict_not_empty(params): return params else: # 该条件是为了形成合理的sql语法 return {"0": 0}def opt_validate(params): """数据库操作之前验证""" result = response() tablename = get_table_name(params) if is_empty(tablename): result.isOk = False result.message = "没有传入表名" return result return resultdef db_result_to_dict(db_result): """数据库查询结果转为字典""" index = 1 dict_result = {} for dbr in db_result: key = "rs" + str(index) index += 1 dict_result[key] = dbr return dict_result
3.sql_test.py
# -*- coding: utf-8 -*-from famiover.model.SMSql import get_out_userfrom famiover.util.sql_util import select_any_table, query_any_tabledef query1(): dic = {'tablename': 'user', 'name': '马建强'} result = select_any_table(dic) for res in result: print(res) print(len(result))def query2(): begin_day = "2016-04-19" end_day = "2016-10-13" param = {"tablename": "user", "begin_day": begin_day, "end_day": end_day} users = get_out_user(param) print(users)def query3(): passdef main(): query3()if __name__ == '__main__': main()