netflow, flow log parser improvements

- faster / cleaner ipv4 conversion
- ipv6 conversion in compressed format, which equals flowd previous output
- unpack source and dest ports
This commit is contained in:
Ad Schellevis 2019-05-16 16:20:34 +02:00
parent 0bdbdf7d21
commit 5eef7248a8

View File

@ -27,7 +27,7 @@
flowd log parser
"""
import struct
import socket
from socket import inet_ntop, AF_INET, AF_INET6, ntohl
class FlowParser:
@ -54,19 +54,19 @@ class FlowParser:
'flow_engine_info'
]
# extract definition
# extract definition, integer values are read as rawdata (not parsed)
field_definition = {
'tag': 'I',
'recv_time': '>II',
'proto_flags_tos': 'BBBB',
'agent_addr4': 'BBBB',
'agent_addr6': 'BBBBBBBBBBBBBBBB',
'src_addr4': 'BBBB',
'src_addr6': 'BBBBBBBBBBBBBBBB',
'dst_addr4': 'BBBB',
'dst_addr6': 'BBBBBBBBBBBBBBBB',
'gateway_addr4': 'BBBB',
'gateway_addr6': 'BBBBBBBBBBBBBBBB',
'agent_addr4': 4,
'agent_addr6': 16,
'src_addr4': 4,
'src_addr6': 16,
'dst_addr4': 4,
'dst_addr6': 16,
'gateway_addr4': 4,
'gateway_addr6': 16,
'srcdst_port': '>HH',
'packets': '>Q',
'octets': '>Q',
@ -107,12 +107,16 @@ class FlowParser:
if self._pow[idx] & data_fields:
fieldname = self.field_definition_order[idx]
if fieldname in self.field_definition:
fsize = self.calculate_size(self.field_definition[fieldname])
content = struct.unpack(
self.field_definition[fieldname],
raw_data[raw_data_idx:raw_data_idx + fsize]
)
raw_record[fieldname] = content[0] if len(content) == 1 else content
if type(self.field_definition[fieldname]) is int:
fsize = self.field_definition[fieldname]
raw_record[fieldname] = raw_data[raw_data_idx:raw_data_idx + fsize]
else:
fsize = self.calculate_size(self.field_definition[fieldname])
content = struct.unpack(
self.field_definition[fieldname],
raw_data[raw_data_idx:raw_data_idx + fsize]
)
raw_record[fieldname] = content[0] if len(content) == 1 else content
raw_data_idx += fsize
return raw_record
@ -122,8 +126,6 @@ class FlowParser:
:return:
"""
# pre-compile address formatters to save time
ip_formatv4 = ".".join(["{}" for i in xrange(4)])
ip_formatv6 = ":".join(["{}" for i in xrange(16)])
with open(self._filename, 'rb') as flowh:
while True:
# header [version, len_words, reserved, fields]
@ -133,7 +135,7 @@ class FlowParser:
header = struct.unpack('BBHI', hdata)
record = self._parse_binary(
raw_data=flowh.read(header[1] * 4),
data_fields=socket.ntohl(header[3])
data_fields=ntohl(header[3])
)
record['sys_uptime_ms'] = record['agent_info'][0]
record['netflow_ver'] = record['agent_info'][3]
@ -146,14 +148,20 @@ class FlowParser:
if 'flow_times' in record:
record['flow_start'] = record['flow_times'][0]
record['flow_finish'] = record['flow_times'][1]
if 'if_indices' in record:
record['if_ndx_in'] = record['if_indices'][0]
record['if_ndx_out'] = record['if_indices'][1]
if 'srcdst_port' in record:
record['src_port'] = record['srcdst_port'][0]
record['dst_port'] = record['srcdst_port'][1]
# concat ipv4/v6 fields into field without [4,6]
for key in self.field_definition_order:
if key in record:
if key[-1] == '4' and len(record[key]) == 4:
record[key[:-1]] = ip_formatv4.format(*record[key])
elif key[-1] == '6' and len(record[key]) == 16:
record[key[:-1]] = ip_formatv6.format(*record[key])
if key[-1] == '4':
record[key[:-1]] = inet_ntop(AF_INET, record[key])
elif key[-1] == '6':
record[key[:-1]] = inet_ntop(AF_INET6, record[key])
# calculated values
record['flow_end'] = record['recv_sec'] - (record['sys_uptime_ms'] - record['flow_finish']) / 1000.0
record['duration_ms'] = (record['flow_finish'] - record['flow_start'])