控制台日志
代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import logging
filename = 'test.log'
def loggers(): logger = logging.getLogger()
ch = logging.StreamHandler()
datafmt = "%Y-%m-%d %H:%M:%S" fmt = '%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s' formatter = logging.Formatter(fmt, datafmt)
logger.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
logger.addHandler(ch)
return logging
if __name__ == "__main__": log = loggers() log.debug('Debug状态') log.info('输入状态') log.warning('警告级别错误') log.error('产生错误信息') log.critical('产生严重错误')
|
效果图
文件日志
代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| import os import logging
filename = 'test.log'
def do_file(): current_path = os.path.dirname(__file__) base_path = os.path.dirname(current_path)
path = os.path.join(base_path, filename) print(f'{filename} -- 文件路径: {path}')
"""如果文件不存在就创建""" if not os.path.exists(path): try: fh = open(path, 'w') except IOError: print(f"Error: 创建 {filename} 文件成功!!!") else: print(f"创建 {filename} 文件成功!!!") fh.close() else: print(f"{filename} 文件已存在!!!")
def loggers(): logger = logging.getLogger()
fh = logging.FileHandler(filename, mode='a', encoding='utf-8')
ch = logging.StreamHandler()
datafmt = "%Y-%m-%d %H:%M:%S" fmt = '%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s' formatter = logging.Formatter(fmt, datafmt)
logger.setLevel(logging.DEBUG)
fh.setFormatter(formatter) ch.setFormatter(formatter)
logger.addHandler(fh) logger.addHandler(ch)
return logging
if __name__ == "__main__": log = loggers() log.debug('Debug状态') log.info('输入状态') log.warning('警告级别错误') log.error('产生错误信息') log.critical('产生严重错误')
|
效果图
控制台
文件
按时间切割日志
代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| import logging import os.path import time
folder_name = "logs"
def my_logging(): log = logging.getLogger() log.setLevel(logging.DEBUG)
rq = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
log_path = os.path.dirname(os.getcwd()) + os.sep + folder_name + os.sep
"""如果文件夹不存在就创建""" os.makedirs(log_path.strip(), exist_ok=True)
logfile = log_path + rq + '.log' fh = logging.FileHandler(filename=logfile, mode='w', encoding='utf-8') fh.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG)
datafmt = "%Y-%m-%d %H:%M:%S" fmt = '%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s' formatter = logging.Formatter(fmt, datafmt) fh.setFormatter(formatter) ch.setFormatter(formatter)
log.addHandler(fh) log.addHandler(ch)
return log
if __name__ == '__main__': logger = my_logging() logger.debug('Debug状态') logger.info('输入状态') logger.warning('警告级别错误') logger.error('产生错误信息') logger.critical('产生严重错误')
|
效果演示
按时间切割自动日志
说明
使用的是 TimedRotatingFileHandler( filename [, when [, interval [, backupCount] ] ] )
函数
filename
输出日志的文件名when
一个字符串,定义了日志切分的间隔时间单位- S:Second 秒
- M:Minutes 分钟
- H:Hour 小时
- D:Days 天
- W:Week day(0 = Monday)
- midnight:Roll over at midnight
interval
间隔时间单位的个数,指等待多少个 when
的时间后 Logger 会自动重建新闻继续进行日志记录backupCount
保留日志的文件个数,超过删除最早的日志
代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| import logging import os from logging import handlers
folder_name = "logs"
loggerName = "log"
level = "debug"
def do_folder(): current_path = os.path.dirname(__file__)
base_path = os.path.abspath(os.path.join(current_path, ".."))
log_folder = base_path + os.sep + folder_name + os.sep """如果文件夹不存在就创建""" os.makedirs(log_folder.strip(), exist_ok=True)
return log_folder
def my_logging(): level_relations = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL }
log_folder = do_folder() log_file = log_folder + loggerName
fh = handlers.TimedRotatingFileHandler(filename=log_file, when='S', encoding='utf-8', backupCount=3, delay=True)
fh.suffix = "%Y-%m-%d_%H-%M-%S.log"
ch = logging.StreamHandler()
date_fmt = '%Y-%m-%d %H:%M:%S' text_fmt = '%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s' format_str = logging.Formatter(text_fmt, date_fmt)
fh.setFormatter(format_str) ch.setFormatter(format_str)
log = logging.getLogger() log.setLevel(level_relations.get(level))
log.addHandler(ch) log.addHandler(fh)
return log
if __name__ == '__main__': logger = my_logging() logger.debug('Debug状态') logger.info('输入状态') logger.warning('警告级别错误') logger.error('产生错误信息') logger.critical('产生严重错误')
|
4.3 效果演示
封装按时间切割日志函数
说明
- 使用的函数也是
TimedRotatingFileHandler
- 只不过我不喜欢生成日志的名字,自己封装了一下
代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| import logging import os import re from logging.handlers import TimedRotatingFileHandler
folderName = "logs"
loggerName = "log"
level = "debug"
LEVELS = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL }
def create_folder(): current_path = os.path.dirname(__file__)
base_path = os.path.abspath(os.path.join(current_path, ".."))
log_dir = base_path + os.sep + folderName + os.sep
"""如果文件夹不存在就创建""" os.makedirs(log_dir.strip(), exist_ok=True)
return log_dir
class MyTimedRotatingFileHandler(TimedRotatingFileHandler): """重写 getFilesToDelete 方法"""
def getFilesToDelete(self): dirName = create_folder() fileNames = os.listdir(dirName) result = [] for fileName in fileNames: result.append(os.path.join(dirName, fileName)) if len(result) < self.backupCount: result = [] else: result.sort() result = result[:len(result) - self.backupCount] return result
def my_logger(): log_folder = create_folder() log_file = log_folder + loggerName
fh = MyTimedRotatingFileHandler(filename=log_file, when='S', backupCount=3, encoding='utf-8', delay=True) fh.suffix = "%Y-%m-%d_%H-%M-%S.log" fh.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}.log$")
def namer(filename): return filename.replace(loggerName + ".", "")
fh.namer = namer
ch = logging.StreamHandler()
datefmt = '%Y-%m-%d %H:%M:%S' format = '%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s \n' format_str = logging.Formatter(format, datefmt)
fh.setFormatter(format_str) ch.setFormatter(format_str)
log = logging.getLogger() log.setLevel(LEVELS.get(level))
log.addHandler(fh) log.addHandler(ch)
return log
if __name__ == '__main__': logger = my_logger() logger.debug('Debug状态') logger.info('输入状态') logger.warning('警告级别错误') logger.error('产生错误信息') logger.critical('产生严重错误')
|
效果演示
带线程锁的切割日志函数
说明
- 虽然
TimedRotatingFileHandler
函数有线程安全的功能,但是在多线程下会出错,查看了网上了多进程的方法,没有弄出来!!! - 于是改变了思路,使用线程锁(多进程单例)!!!
代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
| import os import re import threading
import logging import time
from logging.handlers import TimedRotatingFileHandler
def create_folder(folder_name): current_path = os.path.dirname(__file__)
base_path = os.path.abspath(os.path.join(current_path, ".."))
log_dir = base_path + os.sep + folder_name + os.sep
"""如果文件夹不存在就创建""" os.makedirs(log_dir.strip(), exist_ok=True)
return log_dir
class MyTimedRotatingFileHandler(TimedRotatingFileHandler): """重写 getFilesToDelete 方法"""
def getFilesToDelete(self): dirName = os.path.abspath(os.path.join(self.baseFilename, "..")) fileNames = os.listdir(dirName) result = [] for fileName in fileNames: result.append(os.path.join(dirName, fileName)) if len(result) < self.backupCount: result = [] else: result.sort() result = result[:len(result) - self.backupCount] return result
class MyLogger: _instance_lock = threading.Lock()
def start(self, *args, **kwargs): self.folderName = (kwargs["folderName"] if kwargs.get("folderName") else "logs") self.logname = (kwargs["tempName"] if kwargs.get("tempName") else "log") self.level = (kwargs["level"] if kwargs.get("level") else "debug") self.datefmt = (kwargs["datefmt"] if kwargs.get("datefmt") else None) self.fmt = (kwargs["format"] if kwargs.get("format") else "%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") self.console = (kwargs["console"] if kwargs.get("console") else True) self.file = (kwargs["file"] if kwargs.get("file") else (create_folder(self.folderName) + self.logname)) self.when = (kwargs["when"] if kwargs.get("when") else "D") self.backCount = (kwargs["backCount"] if kwargs.get("backCount") else 30)
self.level_relations = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL } self.logger = logging.getLogger(__name__) self.format = logging.Formatter(self.fmt, self.datefmt)
if not self.level_relations.get(self.level): self.level = "debug" self.logger.setLevel(self.level_relations[self.level]) if self.console: streamHandler = logging.StreamHandler() streamHandler.setFormatter(self.format) self.logger.addHandler(streamHandler) if self.file: timeHandler = MyTimedRotatingFileHandler( filename=self.file, when=self.when, backupCount=self.backCount, encoding="utf-8" ) timeHandler.suffix = "%Y-%m-%d_%H-%M-%S.log" timeHandler.extMatch = re.compile(r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}.log$")
def namer(filename): return filename.replace(self.logname + ".", "")
timeHandler.namer = namer timeHandler.setFormatter(self.format) self.logger.addHandler(timeHandler)
def __new__(cls, *args, **kwargs): if not hasattr(MyLogger, "_instance"): with MyLogger._instance_lock: if not hasattr(MyLogger, "_instance"): MyLogger._instance = object.__new__(cls) MyLogger._instance.start(*args, **kwargs)
time.sleep(1) return MyLogger._instance.logger
if __name__ == '__main__': logger = MyLogger( folderName="logs", tempName="log", datefmt='%Y-%m-%d %H:%M:%S', when="S", backCount=3 ) logger.debug('Debug状态') logger.info('输入状态') logger.warning('警告级别错误') logger.error('产生错误信息') logger.critical('产生严重错误')
|
效果演示
推荐使用的日志模块