From b34f61cb02165997471ec8fa8551529b46d13bd4 Mon Sep 17 00:00:00 2001 From: Ad Schellevis Date: Wed, 30 Mar 2016 19:44:55 +0200 Subject: [PATCH] (netflow) style fixes --- src/opnsense/scripts/netflow/flowctl_stats.py | 20 +++++++------- src/opnsense/scripts/netflow/lib/aggregate.py | 27 +++++++++---------- src/opnsense/scripts/netflow/lib/parse.py | 7 ++--- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/src/opnsense/scripts/netflow/flowctl_stats.py b/src/opnsense/scripts/netflow/flowctl_stats.py index 8ad8fe68f..57d771d4c 100755 --- a/src/opnsense/scripts/netflow/flowctl_stats.py +++ b/src/opnsense/scripts/netflow/flowctl_stats.py @@ -36,7 +36,7 @@ import ujson if __name__ == '__main__': result = dict() - netflow_nodes=list() + netflow_nodes = list() with tempfile.NamedTemporaryFile() as output_stream: subprocess.call(['/usr/sbin/ngctl', 'list'], stdout=output_stream, stderr=open(os.devnull, 'wb')) output_stream.seek(0) @@ -45,23 +45,23 @@ if __name__ == '__main__': netflow_nodes.append(line.split()[1]) for netflow_node in netflow_nodes: - node_stats={'SrcIPaddress': list(), 'DstIPaddress': list(), 'Pkts': 0} + node_stats = {'SrcIPaddress': list(), 'DstIPaddress': list(), 'Pkts': 0} with tempfile.NamedTemporaryFile() as output_stream: - subprocess.call(['/usr/sbin/flowctl','%s:'%netflow_node, 'show'], - stdout=output_stream, stderr=open(os.devnull, 'wb')) + subprocess.call(['/usr/sbin/flowctl', '%s:' % netflow_node, 'show'], + stdout=output_stream, stderr=open(os.devnull, 'wb')) output_stream.seek(0) for line in output_stream.read().split('\n'): - fields=line.split() - if (len(fields) >= 8 and fields[0] != 'SrcIf'): + fields = line.split() + if len(fields) >= 8 and fields[0] != 'SrcIf': node_stats['Pkts'] += int(fields[7]) if fields[1] not in node_stats['SrcIPaddress']: node_stats['SrcIPaddress'].append(fields[1]) if fields[3] not in node_stats['DstIPaddress']: node_stats['DstIPaddress'].append(fields[3]) - result[netflow_node]={'Pkts': node_stats['Pkts'], - 'if': netflow_node[8:], - 'SrcIPaddresses': len(node_stats['SrcIPaddress']), - 'DstIPaddresses': len(node_stats['DstIPaddress'])} + result[netflow_node] = {'Pkts': node_stats['Pkts'], + 'if': netflow_node[8:], + 'SrcIPaddresses': len(node_stats['SrcIPaddress']), + 'DstIPaddresses': len(node_stats['DstIPaddress'])} # handle command line argument (type selection) if len(sys.argv) > 1 and 'json' in sys.argv: diff --git a/src/opnsense/scripts/netflow/lib/aggregate.py b/src/opnsense/scripts/netflow/lib/aggregate.py index cb33e0c3e..85612fe5b 100644 --- a/src/opnsense/scripts/netflow/lib/aggregate.py +++ b/src/opnsense/scripts/netflow/lib/aggregate.py @@ -31,6 +31,7 @@ import os import datetime import sqlite3 + class BaseFlowAggregator(object): # target location ('/var/netflow/.sqlite') target_filename = None @@ -48,20 +49,17 @@ class BaseFlowAggregator(object): self._known_targets = list() # construct update and insert sql statements tmp = 'update %s set octets = octets + :octets_consumed, packets = packets + :packets_consumed ' - tmp = tmp + 'where mtime = :mtime and %s ' - self._update_stmt = tmp % (self._target_table_name, - ' and '.join(map(lambda x: '%s = :%s'%(x,x), - self.agg_fields))) + tmp += 'where mtime = :mtime and %s ' + self._update_stmt = tmp % (self._target_table_name, + ' and '.join(map(lambda x: '%s = :%s' % (x, x), self.agg_fields))) tmp = 'insert into %s (mtime, octets, packets, %s) values (:mtime, :octets_consumed, :packets_consumed, %s)' self._insert_stmt = tmp % (self._target_table_name, ','.join(self.agg_fields), - ','.join(map(lambda x: ':%s'%(x), - self.agg_fields))) + ','.join(map(lambda x: ':%s' % x, self.agg_fields))) # open database self._open_db() self._fetch_known_targets() - def _fetch_known_targets(self): """ read known target table names from the sqlite db """ @@ -79,7 +77,7 @@ class BaseFlowAggregator(object): if self._db_connection is not None: # construct new aggregate table sql_text = list() - sql_text.append('create table %s ( ' % self._target_table_name ) + sql_text.append('create table %s ( ' % self._target_table_name) sql_text.append(' mtime timestamp') for agg_field in self.agg_fields: sql_text.append(', %s varchar(255)' % agg_field) @@ -122,27 +120,28 @@ class BaseFlowAggregator(object): def add(self, flow): """ calculate timeslices per flow depending on sample resolution + :param flow: flow data (from parse.py) """ # make sure target exists if self._target_table_name not in self._known_targets: self._create_target_table() # push record(s) depending on resolution - start_time = int(flow['flow_start'] / self.resolution)*self.resolution + start_time = int(flow['flow_start'] / self.resolution) * self.resolution while start_time <= flow['flow_end']: consume_start_time = max(flow['flow_start'], start_time) consume_end_time = min(start_time + self.resolution, flow['flow_end']) if flow['duration_ms'] != 0: - consume_perc = (consume_end_time - consume_start_time) / float(flow['duration_ms']/1000) + consume_perc = (consume_end_time - consume_start_time) / float(flow['duration_ms'] / 1000) else: consume_perc = 1 if self.is_db_open(): # upsert data - flow['octets_consumed'] = consume_perc*flow['octets'] - flow['packets_consumed'] = consume_perc*flow['packets'] + flow['octets_consumed'] = consume_perc * flow['octets'] + flow['packets_consumed'] = consume_perc * flow['packets'] flow['mtime'] = datetime.datetime.utcfromtimestamp(start_time) - result = self._update_cur.execute(self._update_stmt, flow) + self._update_cur.execute(self._update_stmt, flow) if self._update_cur.rowcount == 0: - result = self._update_cur.execute(self._insert_stmt, flow) + self._update_cur.execute(self._insert_stmt, flow) # next start time start_time += self.resolution diff --git a/src/opnsense/scripts/netflow/lib/parse.py b/src/opnsense/scripts/netflow/lib/parse.py index cf3ec6ea6..871551b6b 100644 --- a/src/opnsense/scripts/netflow/lib/parse.py +++ b/src/opnsense/scripts/netflow/lib/parse.py @@ -46,7 +46,8 @@ PARSE_FLOW_FIELDS = [ {'check': flowd.FIELD_FLOW_TIMES, 'target': 'netflow_ver'}] # location of flowd logfiles to use -FLOWD_LOG_FILES='/var/log/flowd.log*' +FLOWD_LOG_FILES = '/var/log/flowd.log*' + def parse_flow(recv_stamp): """ parse flowd logs and yield records (dict type) @@ -63,7 +64,7 @@ def parse_flow(recv_stamp): flow_record = dict() if flow.has_field(flowd.FIELD_RECV_TIME): # receive timestamp - flow_record['recv'] = flow.recv_sec + flow.recv_usec/1000.0 + flow_record['recv'] = flow.recv_sec + flow.recv_usec / 1000.0 if flow_record['recv'] <= recv_stamp: # do not parse next flow archive (oldest reached) parse_done = True @@ -72,7 +73,7 @@ def parse_flow(recv_stamp): # calculate flow start, end, duration in ms flow_record['flow_end'] = (flow.recv_sec - flow.flow_finish / 1000.0) flow_record['duration_ms'] = (flow.flow_finish - flow.flow_start) - flow_record['flow_start'] = flow_record['flow_end'] - flow_record['duration_ms']/1000.0 + flow_record['flow_start'] = flow_record['flow_end'] - flow_record['duration_ms'] / 1000.0 # handle source data for flow_field in PARSE_FLOW_FIELDS: if flow.has_field(flow_field['check']):