|
1 | 1 | import asyncio |
2 | | -import json |
3 | 2 | import httpx |
| 3 | +import json |
| 4 | +import math |
| 5 | +import os |
4 | 6 | import socket |
5 | 7 | import ssl |
| 8 | +import struct |
| 9 | +import zlib |
6 | 10 |
|
7 | 11 | from typing import Optional |
8 | 12 |
|
@@ -60,7 +64,7 @@ async def tcp_handler(self, massage): |
60 | 64 | """ |
61 | 65 | tcp handler for send logs to Graylog Input with type: gelf tcp |
62 | 66 | :param massage: input message |
63 | | - :return: |
| 67 | + :return: Exception |
64 | 68 | """ |
65 | 69 | gelf_message = GelfBase.make(self, massage) |
66 | 70 | """ Transforming GELF dictionary into bytes """ |
@@ -163,3 +167,64 @@ async def http_handler(self, message): |
163 | 167 | return f"{type(e).__name__} at line {e.__traceback__.tb_lineno} of {__file__}: {e}" |
164 | 168 |
|
165 | 169 | return getattr(e, 'message', repr(e)) |
| 170 | + |
| 171 | + |
| 172 | +class GelfUdp(GelfBase): |
| 173 | + async def udp_handler(self, message): |
| 174 | + """ |
| 175 | + UDP handler for send logs to Graylog Input with type: gelf udp |
| 176 | + :param message: input message |
| 177 | + :return: Message send error in next case: message size more than 1048576 bytes |
| 178 | + """ |
| 179 | + """ |
| 180 | + Declaring limits for GELF messages |
| 181 | + """ |
| 182 | + max_chunk_size = 8192 |
| 183 | + max_chunk_count = 128 |
| 184 | + |
| 185 | + client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
| 186 | + |
| 187 | + gelf_message = GelfBase.make(self, message) |
| 188 | + bytes_msg = json.dumps(gelf_message).encode('utf-8') |
| 189 | + |
| 190 | + if self.compress: |
| 191 | + bytes_msg = zlib.compress(bytes_msg, level=1) |
| 192 | + """ |
| 193 | + Checking the message size. |
| 194 | + """ |
| 195 | + if len(bytes_msg) > max_chunk_size: |
| 196 | + total_chunks = int(math.ceil(len(bytes_msg) / max_chunk_size)) |
| 197 | + |
| 198 | + if total_chunks > max_chunk_count: |
| 199 | + return "Error. Your message couldn't be sent because it's too large." |
| 200 | + |
| 201 | + chunks = [bytes(bytes_msg)[i: i + max_chunk_size] for i in range(0, len(bytes(bytes_msg)), max_chunk_size)] |
| 202 | + |
| 203 | + async for i in self.make_gelf_chunks(chunks, total_chunks): |
| 204 | + client_socket.sendto(i, ( |
| 205 | + self.host, |
| 206 | + self.port |
| 207 | + )) |
| 208 | + |
| 209 | + client_socket.sendto(bytes_msg, ( |
| 210 | + self.host, |
| 211 | + self.port |
| 212 | + )) |
| 213 | + |
| 214 | + async def make_gelf_chunks(self, chunks, total_chunks): |
| 215 | + """ |
| 216 | + Each chunk is padded with overhead to match the GELF specification. |
| 217 | + :param chunks: Chunked gelf_message |
| 218 | + :param total_chunks: The total number of chunks a GELF message requires to send |
| 219 | + :return: |
| 220 | + """ |
| 221 | + message_id = os.urandom(8) |
| 222 | + |
| 223 | + for chunk_index, chunk in enumerate(chunks): |
| 224 | + yield b''.join(( |
| 225 | + b'\0x1e\0x0f', |
| 226 | + message_id, |
| 227 | + struct.pack('b', chunk_index), |
| 228 | + struct.pack('b', total_chunks), |
| 229 | + chunk |
| 230 | + )) |
0 commit comments