diff --git a/src/opnsense/scripts/netflow/lib/parse.py b/src/opnsense/scripts/netflow/lib/parse.py index af6cca6d8..c7f97d518 100755 --- a/src/opnsense/scripts/netflow/lib/parse.py +++ b/src/opnsense/scripts/netflow/lib/parse.py @@ -26,27 +26,11 @@ -------------------------------------------------------------------------------------- parse flowd log files """ -import flowd import glob import tempfile import subprocess import os - -# define field -PARSE_FLOW_FIELDS = [ - {'check': flowd.FIELD_OCTETS, 'target': 'octets'}, - {'check': flowd.FIELD_PACKETS, 'target': 'packets'}, - {'check': flowd.FIELD_SRC_ADDR, 'target': 'src_addr'}, - {'check': flowd.FIELD_DST_ADDR, 'target': 'dst_addr'}, - {'check': flowd.FIELD_SRCDST_PORT, 'target': 'src_port'}, - {'check': flowd.FIELD_SRCDST_PORT, 'target': 'dst_port'}, - {'check': flowd.FIELD_PROTO_FLAGS_TOS, 'target': 'protocol'}, - {'check': flowd.FIELD_PROTO_FLAGS_TOS, 'target': 'tcp_flags'}, - {'check': flowd.FIELD_PROTO_FLAGS_TOS, 'target': 'tos'}, - {'check': flowd.FIELD_IF_INDICES, 'target': 'if_ndx_in'}, - {'check': flowd.FIELD_IF_INDICES, 'target': 'if_ndx_out'}, - {'check': flowd.FIELD_GATEWAY_ADDR, 'target': 'gateway_addr'}, - {'check': flowd.FIELD_FLOW_TIMES, 'target': 'netflow_ver'}] +from lib.flowparser import FlowParser class Interfaces(object): @@ -87,30 +71,15 @@ def parse_flow(recv_stamp, flowd_source='/var/log/flowd.log'): if parse_done: # log file contains older data (recv_stamp), break break - flog = flowd.FlowLog(filename) - for flow in flog: - flow_record = dict() - if flow.has_field(flowd.FIELD_RECV_TIME): - # receive timestamp - flow_record['recv'] = flow.recv_sec - if flow_record['recv'] <= recv_stamp: - # do not parse next flow archive (oldest reached) - parse_done = True - continue - if flow.has_field(flowd.FIELD_FLOW_TIMES): - # calculate flow start, end, duration in ms - flow_record['flow_end'] = flow.recv_sec - (flow.sys_uptime_ms - flow.flow_finish) / 1000.0 - flow_record['duration_ms'] = (flow.flow_finish - flow.flow_start) - flow_record['flow_start'] = flow_record['flow_end'] - flow_record['duration_ms'] / 1000.0 - # handle source data - for flow_field in PARSE_FLOW_FIELDS: - if flow.has_field(flow_field['check']): - flow_record[flow_field['target']] = getattr(flow, flow_field['target']) - else: - flow_record[flow_field['target']] = None - # map interface indexes to actual interface names - flow_record['if_in'] = interfaces.if_device(flow_record['if_ndx_in']) - flow_record['if_out'] = interfaces.if_device(flow_record['if_ndx_out']) - yield flow_record + for flow_record in FlowParser(filename): + if flow_record['recv_sec'] <= recv_stamp: + # do not parse next flow archive (oldest reached) + parse_done = True + continue + # map interface indexes to actual interface names + flow_record['if_in'] = interfaces.if_device(flow_record['if_ndx_in']) + flow_record['if_out'] = interfaces.if_device(flow_record['if_ndx_out']) + + yield flow_record # send None to mark last record yield None