日志记录:
调用同一个对象,分别记录错误日志和运行日志
自定义日志类:
class Logger(object):
__instance = None
def __init__(self):
self.run_log_file = settings.RUN_LOG_FILE
self.error_log_file = settings.ERROR_LOG_FILE
self.run_logger = None
self.error_logger = None
self.initialize_run_log()
self.initialize_error_log()
def __new__(cls, *args, **kwargs): # 单例模式
if not cls.__instance:
cls.__instance = object.__new__(cls, *args, **kwargs)
return cls.__instance
@staticmethod
def check_path_exist(log_abs_file):
log_path = os.path.split(log_abs_file)[0]
if not os.path.exists(log_path):
os.mkdir(log_path)
def initialize_run_log(self):
self.check_path_exist(self.run_log_file)
file_1_1 = logging.FileHandler(self.run_log_file, 'a', encoding='utf-8')
fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s : %(message)s")
file_1_1.setFormatter(fmt)
logger1 = logging.getLogger('run_log') # 'run_log' 随意写
logger1.setLevel(logging.INFO) # 效果与error里logging.Logger一样
logger1.addHandler(file_1_1)
self.run_logger = logger1
def initialize_error_log(self):
self.check_path_exist(self.error_log_file)
file_1_1 = logging.FileHandler(self.error_log_file, 'a', encoding='utf-8')
fmt = logging.Formatter(fmt="%(asctime)s - %(levelname)s : %(message)s")
file_1_1.setFormatter(fmt)
logger1 = logging.Logger('run_log', level=logging.ERROR)
logger1.addHandler(file_1_1)
self.error_logger = logger1
def log(self, message, mode=True):
"""
写入日志
:param message: 日志信息
:param mode: True表示运行信息,False表示错误信息
:return:
"""
if mode:
self.run_logger.info(message)
else:
self.error_logger.error(message)
调用方式:
if ret['code'] == 1000:
print(IP,'更新成功')
Logger().log(ret['message'], True)
else:
Logger().log(ret['message'], False)
API认证:
密钥串+时间戳发送进行认证,超过超时时间认证失败,认证过的密钥过时清空
客户端:
import hashlib
import time
import requests
class Client(object):
def __init__(self):
self.key = '299095cc-1330-11e5-b06a-a45e60bec08b'
self.key_name = 'auth-key'
self.asset_api = 'http://127.0.0.1:8000/api/'
def auth_key(self):
"""
接口认证
"""
ha = hashlib.md5(self.key.encode('utf-8')) # 用self.key 做加密盐
time_span = time.time()
ha.update(bytes("%s|%f" % (self.key, time_span), encoding='utf-8'))
encryption = ha.hexdigest()
result = "%s|%f" % (encryption, time_span)
return {self.key_name: result}
def get_asset(self):
"""
post方式向街口提交资产信息
"""
headers = {}
headers.update(self.auth_key())
response = requests.get(
url=self.asset_api,
headers=headers,
)
print(response.text)
Client().get_asset()
服务端:
from django.views import View
import time
import hashlib
ASSET_AUTH_KEY = '299095cc-1330-11e5-b06a-a45e60bec08b' #认证的KEY
ASSET_AUTH_HEADER_NAME = 'HTTP_AUTH_KEY' # 认证头
ASSET_AUTH_TIME = 2 # 超时时间
ENCRYPT_LIST = [] #存放认证过的key
def api_auth(request):
auth_key = request.META.get(ASSET_AUTH_HEADER_NAME)
if not auth_key: # 请求认证头不正确
return False
sp = auth_key.split('|')
if len(sp) != 2: # 格式不正确
return False
encrypt, timestamp = sp
timestamp = float(timestamp) # str换成float
limit_timestamp = time.time() - ASSET_AUTH_TIME
if limit_timestamp > timestamp: # 当前程序时间与客户端时间戳对比 超时
return False
ha = hashlib