python zmq實現日志服務
ZMQ(zeromq) 是一個消息處理隊列庫 模式有以下三種
1、Request-Reply模式:客戶端在請求后,服務端必須響應
2、Publish-Subscribe模式:廣播所有client,沒有隊列緩存,斷開連接數據將永遠丟失。client可以進行數據過濾。
3、Parallel Pipeline模式:push進行數據推送,work進行數據緩存,pull進行數據競爭獲取處理。區別于Publish-Subscribe存在一個數據緩存和處理負載。當連接被斷開,數據不會丟失,重連后數據繼續發送到對端。

日志接口及客戶端
1 import time
2 import logging
3 import traceback
4 from io import StringIO
5
6 import zmq
7 from tornado.options import options
8
9
10 class ZMQHandler(logging.Handler):
11 def __init__(self, host, port):
12 super().__init__()
13 self.tcp_url = "tcp://{0}:{1}".format(host, port)
14 context = zmq.Context()
15 self.zq = context.socket(zmq.PUSH)
16 self.zq.connect(self.tcp_url)
17
18 def emit(self, record):
19 try:
20 self.zq.send_pyobj(record, flags=zmq.NOBLOCK, protocol=4)
21 except Exception:
22 fp = StringIO()
23 traceback.print_exc(file=fp)
24 error = fp.getvalue()
25 logger.log(logging.ERROR, error)
26 # logger.log(logging.ERROR, "\n".join(error.split("\n")[:-10]))
27
28
29 class ZMQListener(object):
30 def __init__(self, host, port, *handlers, respect_handler_level=False):
31 self.tcp_url = "tcp://{0}:{1}".format(host, port)
32 self.handlers = handlers
33 self.respect_handler_level = respect_handler_level
34 self.context = None
35 self.zq = None
36 self.connect()
37
38 def connect(self):
39 self.context = zmq.Context()
40 self.zq = self.context.socket(zmq.PULL)
41 self.zq.bind(self.tcp_url)
42
43 def close(self):
44 self.zq.close()
45 self.context.term()
46 self.context = None
47 self.zq = None
48
49 def handle(self, record):
50 """
51 Handle a record.
52
53 This just loops through the handlers offering them the record
54 to handle.
55 """
56 for handler in self.handlers:
57 if not self.respect_handler_level:
58 process = True
59 else:
60 process = record.levelno >= handler.level
61
62 if process:
63 handler.handle(record)
64
65 def run(self):
66 while True:
67 try:
68 record = self.zq.recv_pyobj(flags=zmq.NOBLOCK)
69 if record.msg == "EOF":
70 break
71 self.handle(record)
72 except zmq.ZMQError:
73 time.sleep(1)
74 except Exception as e:
75 print(e)
76
77
78 logger = logging.getLogger()
79 zmq_handler = ZMQHandler("127.0.0.1", 9825)
80 logger.setLevel(logging.DEBUG)
81 logger.addHandler(zmq_handler)
82
83
84 def test_zmq():
85 zq_handler = ZMQHandler("127.0.0.1", 9825)
86
87 rootLogger = logging.getLogger('')
88 rootLogger.setLevel(logging.WARN)
89 # socketHandler = logging.handlers.SocketHandler('localhost',
90 # logging.handlers.DEFAULT_TCP_LOGGING_PORT)
91 # don't bother with a formatter, since a socket handler sends the event as
92 # an unformatted pickle
93 rootLogger.addHandler(zq_handler)
94
95 # Now, we can log to the root logger, or any other logger. First the root
96 logging.info('Jackdaws love my big sphinx of quartz.')
97
98 # Now, define a couple of other loggers which might represent areas in your
99 # application:
100
101 logger1 = logging.getLogger('myapp.area1')
102 logger2 = logging.getLogger('myapp.area2')
103
104 logger1.debug('Quick zephyrs blow, vexing daft Jim.')
105 logger1.info('How quickly daft jumping zebras vex.')
106 logger2.warning('Jail zesty vixen who grabbed pay from quack.')
107 logger2.error('The five boxing wizards jump quickly.')
108
109
110 def test_zmq_listener():
111 handler = logging.StreamHandler()
112 formatter = logging.Formatter('%(name)s: %(message)s')
113 handler.setFormatter(formatter)
114 zq_listener = ZMQListener("127.0.0.1", 8090, handler)
115 zq_listener.run()
116


1 import time
2 import logging
3 import traceback
4 from io import StringIO
5
6 import zmq
7 from tornado.options import options
8
9
10 class ZMQHandler(logging.Handler):
11 def __init__(self, host, port):
12 super().__init__()
13 self.tcp_url = "tcp://{0}:{1}".format(host, port)
14 context = zmq.Context()
15 self.zq = context.socket(zmq.PUSH)
16 self.zq.connect(self.tcp_url)
17
18 def emit(self, record):
19 try:
20 self.zq.send_pyobj(record, flags=zmq.NOBLOCK, protocol=4)
21 except Exception:
22 fp = StringIO()
23 traceback.print_exc(file=fp)
24 error = fp.getvalue()
25 logger.log(logging.ERROR, error)
26 # logger.log(logging.ERROR, "\n".join(error.split("\n")[:-10]))
27
28
29 class ZMQListener(object):
30 def __init__(self, host, port, *handlers, respect_handler_level=False):
31 self.tcp_url = "tcp://{0}:{1}".format(host, port)
32 self.handlers = handlers
33 self.respect_handler_level = respect_handler_level
34 self.context = None
35 self.zq = None
36 self.connect()
37
38 def connect(self):
39 self.context = zmq.Context()
40 self.zq = self.context.socket(zmq.PULL)
41 self.zq.bind(self.tcp_url)
42
43 def close(self):
44 self.zq.close()
45 self.context.term()
46 self.context = None
47 self.zq = None
48
49 def handle(self, record):
50 """
51 Handle a record.
52
53 This just loops through the handlers offering them the record
54 to handle.
55 """
56 for handler in self.handlers:
57 if not self.respect_handler_level:
58 process = True
59 else:
60 process = record.levelno >= handler.level
61
62 if process:
63 handler.handle(record)
64
65 def run(self):
66 while True:
67 try:
68 record = self.zq.recv_pyobj(flags=zmq.NOBLOCK)
69 if record.msg == "EOF":
70 break
71 self.handle(record)
72 except zmq.ZMQError:
73 time.sleep(1)
74 except Exception as e:
75 print(e)
76
77
78 logger = logging.getLogger()
79 zmq_handler = ZMQHandler("127.0.0.1", 9825)
80 logger.setLevel(logging.DEBUG)
81 logger.addHandler(zmq_handler)
82
83
84 def test_zmq():
85 zq_handler = ZMQHandler("127.0.0.1", 9825)
86
87 rootLogger = logging.getLogger('')
88 rootLogger.setLevel(logging.WARN)
89 # socketHandler = logging.handlers.SocketHandler('localhost',
90 # logging.handlers.DEFAULT_TCP_LOGGING_PORT)
91 # don't bother with a formatter, since a socket handler sends the event as
92 # an unformatted pickle
93 rootLogger.addHandler(zq_handler)
94
95 # Now, we can log to the root logger, or any other logger. First the root

96 logging.info('Jackdaws love my big sphinx of quartz.')
97
98 # Now, define a couple of other loggers which might represent areas in your
99 # application:
100
101 logger1 = logging.getLogger('myapp.area1')
102 logger2 = logging.getLogger('myapp.area2')
103
104 logger1.debug('Quick zephyrs blow, vexing daft Jim.')
105 logger1.info('How quickly daft jumping zebras vex.')
106 logger2.warning('Jail zesty vixen who grabbed pay from quack.')
107 logger2.error('The five boxing wizards jump quickly.')
108
109
110 def test_zmq_listener():
111 handler = logging.StreamHandler()
112 formatter = logging.Formatter('%(name)s: %(message)s')
113 handler.setFormatter(formatter)
114 zq_listener = ZMQListener("127.0.0.1", 8090, handler)
115 zq_listener.run()
116
代碼中顯示用的zmq的第三種模式處理日志

服務端
1 import logging
2 import os
3 import sys
4 from logging.handlers import TimedRotatingFileHandler
5 from tornado.options import options, define
6
7 from settings import log_dir
8
9 define("logger_host", "127.0.0.1", type=int)
10 define("logger_port", 9825, type=int)
11 define("logger_dir", log_dir, type=str)
12 define("redis_host", "127.0.0.1", type=str)
13 define("redis_port", 6379, type=int)
14 define("redis_password", None, type=str)
15 define("redis_db", 12, type=int)
16 options.parse_command_line()
17 stream_handler = logging.StreamHandler()
18
19 formatter = logging.Formatter('%(asctime)s [%(filename)s:%(lineno)s]: %(message)s')
20 stream_handler.setFormatter(formatter)
21
22 file_handler = TimedRotatingFileHandler(
23 os.path.join(options.logger_dir, "{0}".format(options.logger_port)), backupCount=18, when='h',interval=4)
24 file_handler.suffix="%Y-%m-%d_%H-%M-%S.log"
25 file_handler.setLevel(logging.DEBUG)
26 file_handler.formatter = formatter
27 from core.logger import ZMQListener
28 zmq_listener = ZMQListener(options.logger_host, options.logger_port, file_handler)
29 zmq_listener.run()
30
直接用python啟動服務器就可以,這樣可實現日志的遠程寫入


1 import logging
2 import os
3 import sys
4 from logging.handlers import TimedRotatingFileHandler
5 from tornado.options import options, define
6
7 from settings import log_dir
8
9 define("logger_host", "127.0.0.1", type=int)
10 define("logger_port", 9825, type=int)
11 define("logger_dir", log_dir, type=str)
12 define("redis_host", "127.0.0.1", type=str)
13 define("redis_port", 6379, type=int)
14 define("redis_password", None, type=str)
15 define("redis_db", 12, type=int)
16 options.parse_command_line()
17 stream_handler = logging.StreamHandler()
18
19 formatter = logging.Formatter('%(asctime)s [%(filename)s:%(lineno)s]: %(message)s')
20 stream_handler.setFormatter(formatter)
21
22 file_handler = TimedRotatingFileHandler(
23 os.path.join(options.logger_dir, "{0}".format(options.logger_port)), backupCount=18, when='h',interval=4)
24 file_handler.suffix="%Y-%m-%d_%H-%M-%S.log"
25 file_handler.setLevel(logging.DEBUG)
26 file_handler.formatter = formatter
27 from core.logger import ZMQListener
28 zmq_listener = ZMQListener(options.logger_host, options.logger_port, file_handler)
29 zmq_listener.run()
30
posted on 2019-09-21 17:01 Benjamin 閱讀(908) 評論(0) 編輯 收藏 引用 所屬分類: python