From 124bf35d58e9e6ae739531bda1d35a7a6b541a7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Juge?= Date: Fri, 20 Nov 2020 15:58:50 +0100 Subject: [PATCH 1/3] #781 Add syslog stream for watchers Mostly copied from Syslog logging handler --- circus/stream/__init__.py | 1 + circus/stream/syslog_stream.py | 142 +++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+) create mode 100644 circus/stream/syslog_stream.py diff --git a/circus/stream/__init__.py b/circus/stream/__init__.py index 6aa0c56d3..b36144522 100644 --- a/circus/stream/__init__.py +++ b/circus/stream/__init__.py @@ -8,6 +8,7 @@ from circus.stream.file_stream import FileStream from circus.stream.file_stream import WatchedFileStream # noqa: F401 from circus.stream.file_stream import TimedRotatingFileStream # noqa: F401 +from circus.stream.syslog_stream import SyslogStream # noqa: F401 from circus.stream.redirector import Redirector # noqa: F401 diff --git a/circus/stream/syslog_stream.py b/circus/stream/syslog_stream.py new file mode 100644 index 000000000..b9229edec --- /dev/null +++ b/circus/stream/syslog_stream.py @@ -0,0 +1,142 @@ +from urllib.parse import urlparse +import logging +import socket +import syslog + +from circus.util import to_str + + +class SyslogStream(object): + + # priorities (these are ordered) + LOG_EMERG = 0 # system is unusable + LOG_ALERT = 1 # action must be taken immediately + LOG_CRIT = 2 # critical conditions + LOG_ERR = 3 # error conditions + LOG_WARNING = 4 # warning conditions + LOG_NOTICE = 5 # normal but significant condition + LOG_INFO = 6 # informational + LOG_DEBUG = 7 # debug-level messages + + # facility codes + LOG_KERN = 0 # kernel messages + LOG_USER = 1 # random user-level messages + LOG_MAIL = 2 # mail system + LOG_DAEMON = 3 # system daemons + LOG_AUTH = 4 # security/authorization messages + LOG_SYSLOG = 5 # messages generated internally by syslogd + LOG_LPR = 6 # line printer subsystem + LOG_NEWS = 7 # network news subsystem + LOG_UUCP = 8 # UUCP subsystem + LOG_CRON = 9 # clock daemon + LOG_AUTHPRIV = 10 # security/authorization messages (private) + LOG_FTP = 11 # FTP daemon + + # other codes through 15 reserved for system use + LOG_LOCAL0 = 16 # reserved for local use + LOG_LOCAL1 = 17 # reserved for local use + LOG_LOCAL2 = 18 # reserved for local use + LOG_LOCAL3 = 19 # reserved for local use + LOG_LOCAL4 = 20 # reserved for local use + LOG_LOCAL5 = 21 # reserved for local use + LOG_LOCAL6 = 22 # reserved for local use + LOG_LOCAL7 = 23 # reserved for local use + + facility_names = { + "auth": LOG_AUTH, + "authpriv": LOG_AUTHPRIV, + "cron": LOG_CRON, + "daemon": LOG_DAEMON, + "ftp": LOG_FTP, + "kern": LOG_KERN, + "lpr": LOG_LPR, + "mail": LOG_MAIL, + "news": LOG_NEWS, + "security": LOG_AUTH, # DEPRECATED + "syslog": LOG_SYSLOG, + "user": LOG_USER, + "uucp": LOG_UUCP, + "local0": LOG_LOCAL0, + "local1": LOG_LOCAL1, + "local2": LOG_LOCAL2, + "local3": LOG_LOCAL3, + "local4": LOG_LOCAL4, + "local5": LOG_LOCAL5, + "local6": LOG_LOCAL6, + "local7": LOG_LOCAL7, + } + + def __init__(self, syslog_url, ident=None): + self.socktype = None + info = urlparse(syslog_url) + facility = 'user' + if info.query in logging.handlers.SysLogHandler.facility_names: + facility = info.query + if info.netloc: + address = (info.hostname, info.port or 514) + else: + address = info.path + + if ident: + self.ident = ident + else: + self.ident = '' + self.address = address + self.facility = facility + self.init_syslog(address) + + def init_syslog(self, address): + if isinstance(address, str): + self.unixsocket = True + syslog.openlog(self.ident) + else: + self.unixsocket = False + socktype = socket.SOCK_DGRAM + host, port = address + ress = socket.getaddrinfo(host, port, 0, socktype) + if not ress: + raise OSError("getaddrinfo returns an empty list") + for res in ress: + af, socktype, proto, _, sa = res + err = sock = None + try: + sock = socket.socket(af, socktype, proto) + if socktype == socket.SOCK_STREAM: + sock.connect(sa) + break + except OSError as exc: + err = exc + if sock is not None: + sock.close() + if err is not None: + raise err + self.socket = sock + self.socktype = socktype + + def write_data(self, data): + # data to write to syslog + syslog_data = to_str(data['data']) + if self.unixsocket: + # facility dealt with when opening syslog + # priority forced to LOG_INFO + syslog.syslog(syslog_data) + else: + if self.ident: + syslog_data = self.ident + ":" + syslog_data + facility = self.facility_names[self.facility] + priority = self.LOG_INFO + priority = (facility << 3) | priority + syslog_data = "<" + str(priority) + ">" + syslog_data + self.socket.sendto(syslog_data.encode('utf-8'), self.address) + + def __call__(self, data): + self.write_data(data) + + def close(self): + """ + Closes the socket. + """ + if self.unixsocket: + syslog.closelog() + else: + self.socket.close() From 7a3da5f7fb9e7141f7f6d958da0358089869c8c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Juge?= Date: Fri, 20 Nov 2020 16:00:59 +0100 Subject: [PATCH 2/3] #781 Add basic test for logging to syslog UDP --- circus/tests/test_stream.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/circus/tests/test_stream.py b/circus/tests/test_stream.py index ecd9605b8..c41e07b6d 100644 --- a/circus/tests/test_stream.py +++ b/circus/tests/test_stream.py @@ -4,6 +4,7 @@ import os import tempfile import tornado +import socket from datetime import datetime @@ -13,6 +14,7 @@ from circus.stream import FileStream, WatchedFileStream from circus.stream import TimedRotatingFileStream from circus.stream import FancyStdoutStream +from circus.stream import SyslogStream def run_process(testfile, *args, **kw): @@ -370,4 +372,29 @@ def test_move_file(self): os.unlink(file1) +class TestSyslogStream(TestCase): + stream_class = SyslogStream + + def get_stream(self, *args, **kw): + # need a constant timestamp + stream = self.stream_class(*args, **kw) + return stream + + def test_syslog_udp(self): + local_ip = "127.0.0.1" + local_port = 2514 + buffer_size = 1024 + UDP_socket = socket.socket( + family=socket.AF_INET, type=socket.SOCK_DGRAM) + UDP_socket.bind((local_ip, local_port)) + stream = self.get_stream( + "syslog://"+str(local_ip)+":"+str(local_port), "test_udp") + stream({'data': "TEST SYSLOG UDP"}) + stream.close() + recvd = UDP_socket.recvfrom(buffer_size) + message = recvd[0] + self.assertEqual( + message, "<14>test_udp:TEST SYSLOG UDP".encode("utf-8")) + + test_suite = EasyTestSuite(__name__) From 523338085db44ddde91b2cc25ac9c6e026197ab0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Juge?= Date: Fri, 20 Nov 2020 16:01:28 +0100 Subject: [PATCH 3/3] #781 Update changelog --- docs/source/changelog.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index 3612f1402..f4110488f 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -5,6 +5,7 @@ Changelog history ----------------- - Nothing changed yet +- Add syslog logging for watchers (local and UDP) - #781 0.17.1 2020-09-18 -----------------