netflow, aggregator replace flowd with our new implementation

This commit is contained in:
Ad Schellevis 2019-05-16 18:32:55 +02:00
parent 8aecf38f2e
commit 17e4e9c0fc

View File

@ -26,27 +26,11 @@
--------------------------------------------------------------------------------------
parse flowd log files
"""
import flowd
import glob
import tempfile
import subprocess
import os
# define field
PARSE_FLOW_FIELDS = [
{'check': flowd.FIELD_OCTETS, 'target': 'octets'},
{'check': flowd.FIELD_PACKETS, 'target': 'packets'},
{'check': flowd.FIELD_SRC_ADDR, 'target': 'src_addr'},
{'check': flowd.FIELD_DST_ADDR, 'target': 'dst_addr'},
{'check': flowd.FIELD_SRCDST_PORT, 'target': 'src_port'},
{'check': flowd.FIELD_SRCDST_PORT, 'target': 'dst_port'},
{'check': flowd.FIELD_PROTO_FLAGS_TOS, 'target': 'protocol'},
{'check': flowd.FIELD_PROTO_FLAGS_TOS, 'target': 'tcp_flags'},
{'check': flowd.FIELD_PROTO_FLAGS_TOS, 'target': 'tos'},
{'check': flowd.FIELD_IF_INDICES, 'target': 'if_ndx_in'},
{'check': flowd.FIELD_IF_INDICES, 'target': 'if_ndx_out'},
{'check': flowd.FIELD_GATEWAY_ADDR, 'target': 'gateway_addr'},
{'check': flowd.FIELD_FLOW_TIMES, 'target': 'netflow_ver'}]
from lib.flowparser import FlowParser
class Interfaces(object):
@ -87,30 +71,15 @@ def parse_flow(recv_stamp, flowd_source='/var/log/flowd.log'):
if parse_done:
# log file contains older data (recv_stamp), break
break
flog = flowd.FlowLog(filename)
for flow in flog:
flow_record = dict()
if flow.has_field(flowd.FIELD_RECV_TIME):
# receive timestamp
flow_record['recv'] = flow.recv_sec
if flow_record['recv'] <= recv_stamp:
# do not parse next flow archive (oldest reached)
parse_done = True
continue
if flow.has_field(flowd.FIELD_FLOW_TIMES):
# calculate flow start, end, duration in ms
flow_record['flow_end'] = flow.recv_sec - (flow.sys_uptime_ms - flow.flow_finish) / 1000.0
flow_record['duration_ms'] = (flow.flow_finish - flow.flow_start)
flow_record['flow_start'] = flow_record['flow_end'] - flow_record['duration_ms'] / 1000.0
# handle source data
for flow_field in PARSE_FLOW_FIELDS:
if flow.has_field(flow_field['check']):
flow_record[flow_field['target']] = getattr(flow, flow_field['target'])
else:
flow_record[flow_field['target']] = None
# map interface indexes to actual interface names
flow_record['if_in'] = interfaces.if_device(flow_record['if_ndx_in'])
flow_record['if_out'] = interfaces.if_device(flow_record['if_ndx_out'])
yield flow_record
for flow_record in FlowParser(filename):
if flow_record['recv_sec'] <= recv_stamp:
# do not parse next flow archive (oldest reached)
parse_done = True
continue
# map interface indexes to actual interface names
flow_record['if_in'] = interfaces.if_device(flow_record['if_ndx_in'])
flow_record['if_out'] = interfaces.if_device(flow_record['if_ndx_out'])
yield flow_record
# send None to mark last record
yield None