国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

python初學:第二步——模擬實現一個ATM + 購物商城程序

rollback / 2515人閱讀

摘要:功能描述作業需求額度或自定義實現購物商城,買東西加入購物車,調用信用卡接口結賬可以提現,手續費支持多賬戶登錄支持賬戶間轉賬記錄每月日常消費流水提供還款接口記錄操作日志提供管理接口,包括添加賬戶用戶額度,凍結賬戶等。。。

功能描述

作業需求:

1、額度 15000或自定義
2、實現購物商城,買東西加入購物車,調用信用卡接口結賬
3、可以提現,手續費5%
4、支持多賬戶登錄
5、支持賬戶間轉賬
6、記錄每月日常消費流水
7、提供還款接口
8、ATM記錄操作日志
9、提供管理接口,包括添加賬戶、用戶額度,凍結賬戶等。。。
10、用戶認證用裝飾器

注意:以上需求,要充分使用函數,請盡你的最大限度來減少重復代碼!

流程圖

程序流程圖(待補全)
![]()

程序目錄結構
bin
├── atm.py  # atm入口
├── __init__.py
└── manage.py  # 管理入口
conf
├── __init__.py
└── settings.py  # 配置文件
core
├── accounts.py  # 賬號添加、修改額度、禁用、啟動接口
├── actions.py   # sql動作接口
├── auth.py      # 用戶認證接口
├── database.py  # 數據庫操作接口
├── db_handler.py# 無用到
├── __init__.py
├── logger.py    # 日志接口
├── main.py      # 主接口
├── parsers.py   # sql語法解析接口
└── transaction.py # 交易接口
db
├── accounts_table  # 用戶賬號表
└── managers_table  # 管理員賬號表
log
├── access.log      #訪問日志
└── transactions.log #交易日志
程序主體

github鏈接

atm.py

#!_*_coding:utf-8_*_

import os
import sys
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
print(base_dir)
sys.path.append(base_dir)

from core import main

if __name__ == "__main__":
    main.run("atm")

manage.py

#!_*_coding:utf-8_*_

import os
import sys
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
print(base_dir)
sys.path.append(base_dir)

from core import main

if __name__ == "__main__":
    main.run("manage")

setting.py

#!_*_coding:utf-8_*_
#__author__:"Alex Li"
import os
import sys
import logging
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

# Database title summary
TITLE = ["id","name","age","phone","dept","enroll_date","expire_date","account","password","credit","balance","status","pay_day"]

# Account database setting
DATABASE = {
    "engine": "file_storage",  # support mysql, postgresql in the future
    "name":"accounts_table",
    "path": "%s/db/" % BASE_DIR
}

# Manager account database setting
MANAGE_DATABASE = {
    "engine": "file_storage",  # support mysql, postgresql in the future
    "name":"managers_table",
    "path": "%s/db/" % BASE_DIR
}

# logger setting
LOG_LEVEL = logging.INFO
LOG_TYPES = {
    "transaction": "transactions.log",
    "access": "access.log",
}

# Transaction setting
TRANSACTION_TYPE = {
    "repay":{"action":"plus", "interest":0},
    "withdraw":{"action":"minus", "interest":0.05},
    "transfer":{"action":"minus", "interest":0.05},
    "consume":{"action":"minus", "interest":0},

}

ACCOUNT_DEFAULT = {
    "credit": 15000.0

}

accounts.py

#!_*_coding:utf-8_*_
import json
import time
from core import database
from conf import settings
from core import parsers
from core import actions


def load_accounts(account):
    """ Check account whether exists in a database

    :param account:
    :return:
    """
    base_dir = settings.DATABASE["path"]
    sql_str = "select * from accounts_table where account = %s" % account
    sql_type = sql_str.split()[0]
    dict_sql = parsers.parsers(sql_str, sql_type, base_dir)
    res = actions.actions(sql_type, dict_sql)
    if not res:
        return False
    else:
        return True


def change_account(account, set_str):
    """ Change account data

    :param account:
    :param set_str:
    :return:
    """
    base_dir = settings.DATABASE["path"]
    sql_str = "update accounts_table set %s where account = %s" % (set_str, account)
    sql_type = sql_str.split()[0]
    dict_sql = parsers.parsers(sql_str, sql_type, base_dir)
    actions.actions(sql_type, dict_sql)


def add_account(*args):
    """ Add an new account

    :param args:
    :param kwargs:
    :return:
    """
    base_dir = settings.DATABASE["path"]
    sql_str = "add to accounts_table values %s" % (",".join(args))
    sql_type = sql_str.split()[0]
    dict_sql = parsers.parsers(sql_str, sql_type, base_dir)
    actions.actions(sql_type, dict_sql)

actions.py

# -*- coding: utf-8 -*-
from core import database
from conf import settings
import re


def actions(sql_type,dict_sql):
    """ sql操作主函數

    :param sql_type: sql語句的類型
    :return:
        actions_dict[sql_type]
        相應操作的函數
    """
    actions_dict = {"select": select_action,
                    "add": add_action,
                    "del": del_action,
                    "update": update_action}
    if sql_type in actions_dict:  # 判斷導入的sql類型是否在actions_dict字典中定義。
        return actions_dict[sql_type](dict_sql)
    else:
        return False


def select_action(dict_sql):
    temp_list = []
    info = dict_sql["select"]
    data = database.read_db(dict_sql["from"])  # 獲取原始數據庫文件中的所有數據,data為列表格式
    key = dict_sql["where"][0]  # 獲取sql語句中where語句的key值。如id = 1,獲取id
    count = 0
    for values in data:  # 讀取data列表中的每一個元素,values是字典格式
        if type(values[key]) is int:
            value = str(values[key])
        else:
            value = """ + str(values[key]) + """
        dict_sql["where"][0] = value  # 將values[key]的值取出并重新賦值為sql語句的key值。
        print(where_action(dict_sql["where"]))
        if where_action(dict_sql["where"]):  # 將新的where語句,發送給where_action語句進行bool判斷。
            count += 1
            temp_list.append(values)
    return temp_list


def add_action(dict_sql):
    """ 插入動作
        獲取用戶輸入的values,并在表中插入

    :param dict_sql: parsers函數處理后的字典格式的sql語句
    """
    data = database.read_db(dict_sql["to"])  # 獲取原始數據庫文件中的所有數據
    value = dict_sql["values"]  # 從dict_sql中獲取values的列表
    t_id = str(int(data[-1]["id"]) + 1)  # 獲取原始數據庫文件中id列最后一行的id數值,并每次自動+1。然后轉換為字符串格式
    value.insert(0, t_id)  # 將添加的id插入到value變量中
    if len(value) != len(settings.TITLE):  # 判斷輸入值得長度是否等于數據庫文件中定義的列的長度
        print("列數不正確")
    else:
        data.append(dict(zip(settings.TITLE, value)))  # 在獲取的原始數據中插入行的數據
        database.write_db(dict_sql["to"], data)


def del_action(dict_sql):
    """ 刪除動作函數

    :param dict_sql: parsers函數處理后的字典格式的sql語句
    """
    temp_list = []
    data = database.read_db(dict_sql["from"])  # 獲取原始數據庫文件中的所有數據,data為列表格式
    key = dict_sql["where"][0]  # 獲取sql語句中where語句的key值。如id = 1,獲取id
    for values in data:  # 讀取data列表中的每一個元素,values是字典格式
        if type(values[key]) is int:
            value = str(values[key])
        else:
            value = """ + str(values[key]) + """
        dict_sql["where"][0] = value  # 將values[key]的值取出并重新賦值為sql語句的key值。
        if where_action(dict_sql["where"]):  # 將新的where語句,發送給where_action語句進行bool判斷。
            temp_list.append(values)  # 如果符合條件,就從data中移除對應的values
    return temp_list
    # print("已刪除%s條記錄" % len(temp_list))
    # for i in temp_list:
    #     data.remove(i)
    # write_db(dict_sql["from"], data)  # 將新生成的data重新寫入文件


def update_action(dict_sql):
    """ 更新動作函數

    :param dict_sql: parsers函數處理后的字典格式的sql語句
    """
    data = database.read_db(dict_sql["update"])  # 獲取原始數據庫文件中的所有數據,data為列表格式
    key = dict_sql["where"][0]  # 獲取sql語句中where語句的key值。如id = 1,獲取id
    set_key = dict_sql["set"][0]  # 獲取set語句中用戶輸入的key
    set_value = dict_sql["set"][2].strip(""").strip(""")  # 獲取set語句中用戶輸入的value
    count = 0
    for values in data:  # 讀取data列表中的每一個元素,values是字典格式
        if type(values[key]) is int:
            value = str(values[key])
        else:
            value = """ + str(values[key]) + """
        dict_sql["where"][0] = value  # 將values[key]的值取出并重新賦值為sql語句的key值。
        if where_action(dict_sql["where"]):  # 將新的where語句,發送給where_action語句進行bool判斷。
            count += 1
            values[set_key] = set_value  # 如果符合條件,使用將set_key的值修改為set_value
    print(data)
    print("已更新%s條記錄" % count)
    database.write_db(dict_sql["update"], data)  # 將新生成的data重新寫入文件


def where_action(condition):
    """ where語句操作函數

    :param condition: 判斷語句。就是字典中where的值
    :return:
    """
    if "like" in condition:  # 如果like在語句中
        # 將where語句中的第二個參數和,第一個參數進行正則比較。如果執行正常就返回True
        return re.search(condition[2].strip(""").strip("""), condition[0]) and True

    else:
        return eval(" ".join(condition))  # 除此使用eval進行python的邏輯判斷

auth.py

#!_*_coding:utf-8_*_
import os
from core import parsers
from core import actions
from core import db_handler
from conf import settings
from core import logger
import json
import time


def login_required(func):
    "驗證用戶是否登錄"

    def wrapper(*args, **kwargs):
        print(args, kwargs)
        # print("--wrapper--->",args,kwargs)
        if args[0].get("is_authenticated"):
            return func(*args, **kwargs)
        else:
            exit("User is not authenticated.")
    return wrapper


def acc_auth(account, password, type):
    """
    優化版認證接口
    :param account: credit account number
    :param password: credit card password
    :return: if passed the authentication , retun the account object, otherwise ,return None

    """
    # table = None
    # base_dir = None
    if type == "atm":
        base_dir = settings.DATABASE["path"]
        table = settings.DATABASE["name"]
    elif type == "manage":
        base_dir = settings.MANAGE_DATABASE["path"]
        table = settings.MANAGE_DATABASE["name"]
    sql_str = "select * from %s where account = %s" % (table, account)
    sql_type = sql_str.split()[0]
    dict_sql = parsers.parsers(sql_str, sql_type, base_dir)
    print(dict_sql)
    res = actions.actions(sql_type, dict_sql)
    print(res)
    if not res:
        print("