diff --git a/src/opnsense/scripts/netflow/lib/parse.py b/src/opnsense/scripts/netflow/lib/parse.py index 209b13c23..dd0554a26 100644 --- a/src/opnsense/scripts/netflow/lib/parse.py +++ b/src/opnsense/scripts/netflow/lib/parse.py @@ -29,6 +29,7 @@ import flowd import glob +# define field PARSE_FLOW_FIELDS = [ {'check': flowd.FIELD_OCTETS, 'target': 'octets'}, {'check': flowd.FIELD_PACKETS, 'target': 'packets'}, @@ -44,30 +45,38 @@ PARSE_FLOW_FIELDS = [ {'check': flowd.FIELD_GATEWAY_ADDR, 'target': 'gateway_addr'}, {'check': flowd.FIELD_FLOW_TIMES, 'target': 'netflow_ver'}] +# location of flowd logfiles to use FLOWD_LOG_FILES='/var/log/flowd.log*' def parse_flow(recv_stamp): + """ parse flowd logs and yield records (dict type) + :param recv_stamp: last receive timestamp (recv) + :return: iterator flow details + """ parse_done = False for filename in sorted(glob.glob(FLOWD_LOG_FILES)): if parse_done: + # log file contains older data (recv_stamp), break break flog = flowd.FlowLog(filename) for flow in flog: flow_record = dict() if flow.has_field(flowd.FIELD_RECV_TIME): + # receive timestamp flow_record['recv'] = flow.recv_sec + flow.recv_usec/1000.0 if flow_record['recv'] <= recv_stamp: # do not parse next flow archive (oldest reached) parse_done = True continue if flow.has_field(flowd.FIELD_FLOW_TIMES): + # calculate flow start, end, duration in ms flow_record['flow_end'] = (flow.recv_sec - flow.flow_finish / 1000.0) flow_record['duration_ms'] = (flow.flow_finish - flow.flow_start) flow_record['flow_start'] = flow_record['flow_end'] - flow_record['duration_ms']/1000.0 + # handle source data for flow_field in PARSE_FLOW_FIELDS: if flow.has_field(flow_field['check']): flow_record[flow_field['target']] = getattr(flow, flow_field['target']) else: flow_record[flow_field['target']] = None - yield flow_record