摘要:功能描述作業需求額度或自定義實現購物商城,買東西加入購物車,調用信用卡接口結賬可以提現,手續費支持多賬戶登錄支持賬戶間轉賬記錄每月日常消費流水提供還款接口記錄操作日志提供管理接口,包括添加賬戶用戶額度,凍結賬戶等。。。
功能描述
作業需求:
1、額度 15000或自定義 2、實現購物商城,買東西加入購物車,調用信用卡接口結賬 3、可以提現,手續費5% 4、支持多賬戶登錄 5、支持賬戶間轉賬 6、記錄每月日常消費流水 7、提供還款接口 8、ATM記錄操作日志 9、提供管理接口,包括添加賬戶、用戶額度,凍結賬戶等。。。 10、用戶認證用裝飾器
注意:以上需求,要充分使用函數,請盡你的最大限度來減少重復代碼!
流程圖程序流程圖(待補全)
![]()
bin ├── atm.py # atm入口 ├── __init__.py └── manage.py # 管理入口 conf ├── __init__.py └── settings.py # 配置文件 core ├── accounts.py # 賬號添加、修改額度、禁用、啟動接口 ├── actions.py # sql動作接口 ├── auth.py # 用戶認證接口 ├── database.py # 數據庫操作接口 ├── db_handler.py# 無用到 ├── __init__.py ├── logger.py # 日志接口 ├── main.py # 主接口 ├── parsers.py # sql語法解析接口 └── transaction.py # 交易接口 db ├── accounts_table # 用戶賬號表 └── managers_table # 管理員賬號表 log ├── access.log #訪問日志 └── transactions.log #交易日志程序主體
github鏈接
atm.py
#!_*_coding:utf-8_*_ import os import sys base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) print(base_dir) sys.path.append(base_dir) from core import main if __name__ == "__main__": main.run("atm")
manage.py
#!_*_coding:utf-8_*_ import os import sys base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) print(base_dir) sys.path.append(base_dir) from core import main if __name__ == "__main__": main.run("manage")
setting.py
#!_*_coding:utf-8_*_ #__author__:"Alex Li" import os import sys import logging BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Database title summary TITLE = ["id","name","age","phone","dept","enroll_date","expire_date","account","password","credit","balance","status","pay_day"] # Account database setting DATABASE = { "engine": "file_storage", # support mysql, postgresql in the future "name":"accounts_table", "path": "%s/db/" % BASE_DIR } # Manager account database setting MANAGE_DATABASE = { "engine": "file_storage", # support mysql, postgresql in the future "name":"managers_table", "path": "%s/db/" % BASE_DIR } # logger setting LOG_LEVEL = logging.INFO LOG_TYPES = { "transaction": "transactions.log", "access": "access.log", } # Transaction setting TRANSACTION_TYPE = { "repay":{"action":"plus", "interest":0}, "withdraw":{"action":"minus", "interest":0.05}, "transfer":{"action":"minus", "interest":0.05}, "consume":{"action":"minus", "interest":0}, } ACCOUNT_DEFAULT = { "credit": 15000.0 }
accounts.py
#!_*_coding:utf-8_*_ import json import time from core import database from conf import settings from core import parsers from core import actions def load_accounts(account): """ Check account whether exists in a database :param account: :return: """ base_dir = settings.DATABASE["path"] sql_str = "select * from accounts_table where account = %s" % account sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) res = actions.actions(sql_type, dict_sql) if not res: return False else: return True def change_account(account, set_str): """ Change account data :param account: :param set_str: :return: """ base_dir = settings.DATABASE["path"] sql_str = "update accounts_table set %s where account = %s" % (set_str, account) sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) actions.actions(sql_type, dict_sql) def add_account(*args): """ Add an new account :param args: :param kwargs: :return: """ base_dir = settings.DATABASE["path"] sql_str = "add to accounts_table values %s" % (",".join(args)) sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) actions.actions(sql_type, dict_sql)
actions.py
# -*- coding: utf-8 -*- from core import database from conf import settings import re def actions(sql_type,dict_sql): """ sql操作主函數 :param sql_type: sql語句的類型 :return: actions_dict[sql_type] 相應操作的函數 """ actions_dict = {"select": select_action, "add": add_action, "del": del_action, "update": update_action} if sql_type in actions_dict: # 判斷導入的sql類型是否在actions_dict字典中定義。 return actions_dict[sql_type](dict_sql) else: return False def select_action(dict_sql): temp_list = [] info = dict_sql["select"] data = database.read_db(dict_sql["from"]) # 獲取原始數據庫文件中的所有數據,data為列表格式 key = dict_sql["where"][0] # 獲取sql語句中where語句的key值。如id = 1,獲取id count = 0 for values in data: # 讀取data列表中的每一個元素,values是字典格式 if type(values[key]) is int: value = str(values[key]) else: value = """ + str(values[key]) + """ dict_sql["where"][0] = value # 將values[key]的值取出并重新賦值為sql語句的key值。 print(where_action(dict_sql["where"])) if where_action(dict_sql["where"]): # 將新的where語句,發送給where_action語句進行bool判斷。 count += 1 temp_list.append(values) return temp_list def add_action(dict_sql): """ 插入動作 獲取用戶輸入的values,并在表中插入 :param dict_sql: parsers函數處理后的字典格式的sql語句 """ data = database.read_db(dict_sql["to"]) # 獲取原始數據庫文件中的所有數據 value = dict_sql["values"] # 從dict_sql中獲取values的列表 t_id = str(int(data[-1]["id"]) + 1) # 獲取原始數據庫文件中id列最后一行的id數值,并每次自動+1。然后轉換為字符串格式 value.insert(0, t_id) # 將添加的id插入到value變量中 if len(value) != len(settings.TITLE): # 判斷輸入值得長度是否等于數據庫文件中定義的列的長度 print("列數不正確") else: data.append(dict(zip(settings.TITLE, value))) # 在獲取的原始數據中插入行的數據 database.write_db(dict_sql["to"], data) def del_action(dict_sql): """ 刪除動作函數 :param dict_sql: parsers函數處理后的字典格式的sql語句 """ temp_list = [] data = database.read_db(dict_sql["from"]) # 獲取原始數據庫文件中的所有數據,data為列表格式 key = dict_sql["where"][0] # 獲取sql語句中where語句的key值。如id = 1,獲取id for values in data: # 讀取data列表中的每一個元素,values是字典格式 if type(values[key]) is int: value = str(values[key]) else: value = """ + str(values[key]) + """ dict_sql["where"][0] = value # 將values[key]的值取出并重新賦值為sql語句的key值。 if where_action(dict_sql["where"]): # 將新的where語句,發送給where_action語句進行bool判斷。 temp_list.append(values) # 如果符合條件,就從data中移除對應的values return temp_list # print("已刪除%s條記錄" % len(temp_list)) # for i in temp_list: # data.remove(i) # write_db(dict_sql["from"], data) # 將新生成的data重新寫入文件 def update_action(dict_sql): """ 更新動作函數 :param dict_sql: parsers函數處理后的字典格式的sql語句 """ data = database.read_db(dict_sql["update"]) # 獲取原始數據庫文件中的所有數據,data為列表格式 key = dict_sql["where"][0] # 獲取sql語句中where語句的key值。如id = 1,獲取id set_key = dict_sql["set"][0] # 獲取set語句中用戶輸入的key set_value = dict_sql["set"][2].strip(""").strip(""") # 獲取set語句中用戶輸入的value count = 0 for values in data: # 讀取data列表中的每一個元素,values是字典格式 if type(values[key]) is int: value = str(values[key]) else: value = """ + str(values[key]) + """ dict_sql["where"][0] = value # 將values[key]的值取出并重新賦值為sql語句的key值。 if where_action(dict_sql["where"]): # 將新的where語句,發送給where_action語句進行bool判斷。 count += 1 values[set_key] = set_value # 如果符合條件,使用將set_key的值修改為set_value print(data) print("已更新%s條記錄" % count) database.write_db(dict_sql["update"], data) # 將新生成的data重新寫入文件 def where_action(condition): """ where語句操作函數 :param condition: 判斷語句。就是字典中where的值 :return: """ if "like" in condition: # 如果like在語句中 # 將where語句中的第二個參數和,第一個參數進行正則比較。如果執行正常就返回True return re.search(condition[2].strip(""").strip("""), condition[0]) and True else: return eval(" ".join(condition)) # 除此使用eval進行python的邏輯判斷
auth.py
#!_*_coding:utf-8_*_ import os from core import parsers from core import actions from core import db_handler from conf import settings from core import logger import json import time def login_required(func): "驗證用戶是否登錄" def wrapper(*args, **kwargs): print(args, kwargs) # print("--wrapper--->",args,kwargs) if args[0].get("is_authenticated"): return func(*args, **kwargs) else: exit("User is not authenticated.") return wrapper def acc_auth(account, password, type): """ 優化版認證接口 :param account: credit account number :param password: credit card password :return: if passed the authentication , retun the account object, otherwise ,return None """ # table = None # base_dir = None if type == "atm": base_dir = settings.DATABASE["path"] table = settings.DATABASE["name"] elif type == "manage": base_dir = settings.MANAGE_DATABASE["path"] table = settings.MANAGE_DATABASE["name"] sql_str = "select * from %s where account = %s" % (table, account) sql_type = sql_str.split()[0] dict_sql = parsers.parsers(sql_str, sql_type, base_dir) print(dict_sql) res = actions.actions(sql_type, dict_sql) print(res) if not res: print("