Analyzing Trace Data Using Apache Spark

This is a demo for using Apache Spark to analyze network trace data. The demo is a good example for using Apache Spark and go through different angles of it including optimizing the processing to a cluster of nodes. It uses:

  • File reader with partitions
  • Data frame creation
  • Filter and map functions
  • Window functions
  • UDF functions
  • Graph plot

The Trace File
The trace data read from a file that contains information on how a router handled each packet. In particular it looks at metadata that represents how flow table and NBAR2 process each packet.

The Analysis
The analysis performs few tasks:

  • Parse packets from a file
  • Create a data frame with the packet info
  • Create network 5-tuple for the packets
  • Generate histograms and graphs

The analysis written in Jupyter Notebook PySpark Shell over Apache Spark running on top of 2 nodes Hadoop cluster with Yarn.

About me

I'm a system/software architect, working for the Core Software Group at Cisco Systems. Having many years of experience in software development, management, and architecture. I was working on various network services products in the areas of security, media, application recognition, visibility, and control. I'm holding a Master of Science degree in electrical engineering and business management from Tel Aviv University and always likes to learn and experiment with new technologies and ideas. More can be found in my website: http://nir.bendvora.com/Work

Importing the necessary Python libraries

# Show / Hide code Button
from IPython.display import HTML
HTML('''<button class="toggle">Show/Hide Code</button>''')
import re
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import sys
%matplotlib notebook
sc

SparkContext

Spark UI

Version
v2.2.0
Master
yarn
AppName
pyspark-shell

Spark UI Environment

HTML('''<a href="./sparkEnvironment.JPG" target="_blank"><img src="./sparkEnvironment.JPG" 
alt="Spark Environment" width="500" height="245">
</a>Click for full size''')
Spark Environment Click for full size
# Config to display all pandas columns
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

Reading the Trace file

The trace file placed in HDFS "nirbd_trace_nov_8.txt". Reading the file using the newAPIHadoopFile that split the file into partitions based on the delimiter. Create 50 RDD partitions.

# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
trace = sc.newAPIHadoopFile(
    'hdfs://hadoop1:54310/user/hduser/trace_analysis/nirbd_trace_nov_8.txt',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': 'Packet: '}).partitionBy(50)
print "Check partition result:\n Num Partitions=", trace.getNumPartitions(), \
    "\n Partitioner=", trace.partitioner, \
    "\n Partition Function=", trace.partitioner.partitionFunc
trace_values = trace.values()
print "Number of packets read: ", trace_values.count()
Check partition result:
 Num Partitions= 50 
 Partitioner= <pyspark.rdd.Partitioner object at 0x7f82874010d0> 
 Partition Function= <function portable_hash at 0x7f82a40f4a28>
Number of packets read:  8193

Filter valide packets

Use map functtion to filter valid packets. It applies the filter for every partition but doesn't preserve the parent partitioning

# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
def filter_valid(r):
    if re.match("^\d+\s+CBUG ID",r): return True 
    else: return False
valid = trace_values.filter(filter_valid)
print "Num Partitions=", valid.getNumPartitions(), \
    "\nPartitioner=", valid.partitioner, \
    "\nNumber of valid packets:", valid.count()
Num Partitions= 50 
Partitioner= None 
Number of valid packets: 8192
#t = valid.take(10)

Parsing packets

Extracting relevant information using regex on each packet text. Applyting map to every packet RDD. Create a data frame Row from every packet

# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
def parse_pkt(pkt):
    pkt_num = int(re.match("^(\d+)\s+CBUG ID",pkt).group(1))
    r = re.search("Summary\n  Input     : (.*)\n  Output    : (.*)\n  State     :.*\n  Timestamp\n    Start   : (\d+) ns", pkt)
    in_if = r.group(1)
    out_if = r.group(2)
    ts_start = long(r.group(3))
    r = re.search("  Feature: CFT\n    API.*\n.*\n.*\n.*\n    direction             : (.*)\n.*\n.*\n.*\n.*\n.*\n    cft_l3_payload_size   : (\d+)\n.*\n.*\n    tuple.src_ip          : (.*)\n    tuple.dst_ip          : (.*)\n    tuple.src_port        : (\d+)\n    tuple.dst_port        : (\d+)\n    tuple.vrfid           : (\d+)\n    tuple.l4_protocol     : (.*)\n    tuple.l3_protocol     : (.*)\n", pkt)
    direction = r.group(1)
    size = int(r.group(2))
    src_ip = r.group(3)
    dst_ip = r.group(4)
    src_port = r.group(5)
    dst_port = r.group(6)
    vrf = int(r.group(7))
    protocol = r.group(8)
    ip_ver = r.group(9)
    r = re.search("  Feature: NBAR\n    Packet number in flow: (.*)\n    Classification state: (.*)\n    Classification name: (.*)\n    Classification ID: .*\n    Number of matched sub-classifications: (\d+)\n    Number of extracted fields: (\d+)\n", pkt)
    if r.group(1) == "N/A": pkt_in_flow = -1
    else: pkt_in_flow = int(r.group(1))
    cls_state = r.group(2)
    app = r.group(3)
    num_subcls = int(r.group(4))
    num_exfld = int(r.group(5))
    r = re.search("IPV4_INPUT_STILE_LEGACY\n    Lapsed time : (\d+) ns", pkt)
    if r == None: nbar_in_cycles = -1
    else: nbar_in_cycles = int(r.group(1))
    r = re.search("IPV4_OUTPUT_STILE_LEGACY\n    Lapsed time : (\d+) ns", pkt)
    if r == None: nbar_out_cycles = -1
    else: nbar_out_cycles = int(r.group(1))
    return Row(pkt_num=pkt_num, in_if=in_if, out_if=out_if, ts_start=ts_start, 
               direction=direction, size=size, src_ip=src_ip, dst_ip=dst_ip, src_port=src_port, dst_port=dst_port,
               vrf=vrf, protocol=protocol, ip_ver=ip_ver,
               pkt_in_flow=pkt_in_flow, cls_state=cls_state, app=app, num_subcls=num_subcls, num_exfld=num_exfld,
               nbar_in_cycles=nbar_in_cycles, nbar_out_cycles=nbar_out_cycles)
#parse_pkt(t[1])

Parse the entire trace file

Caching the result data frame in memory for next usage

# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
pkts = valid.map(parse_pkt).toDF() \
       .select('pkt_num', 'ts_start', 'protocol', 'src_ip', 'dst_ip', 'src_port', 'dst_port', 'size', 'in_if', 
                'out_if', 'app', 'cls_state', 'pkt_in_flow', 'nbar_in_cycles', 'nbar_out_cycles', 
                'num_subcls', 'num_exfld' ,'ip_ver', 'direction', 'vrf') \
       .cache()

pkts Schema

pkts.printSchema()
root
 |-- pkt_num: long (nullable = true)
 |-- ts_start: long (nullable = true)
 |-- protocol: string (nullable = true)
 |-- src_ip: string (nullable = true)
 |-- dst_ip: string (nullable = true)
 |-- src_port: string (nullable = true)
 |-- dst_port: string (nullable = true)
 |-- size: long (nullable = true)
 |-- in_if: string (nullable = true)
 |-- out_if: string (nullable = true)
 |-- app: string (nullable = true)
 |-- cls_state: string (nullable = true)
 |-- pkt_in_flow: long (nullable = true)
 |-- nbar_in_cycles: long (nullable = true)
 |-- nbar_out_cycles: long (nullable = true)
 |-- num_subcls: long (nullable = true)
 |-- num_exfld: long (nullable = true)
 |-- ip_ver: string (nullable = true)
 |-- direction: string (nullable = true)
 |-- vrf: long (nullable = true)

#### Show unique applications in the trace
# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
# Unique Applications
pkts.select('app').distinct().orderBy('app').toPandas()
app
0 capwap-data
1 cifs
2 cisco-collab-audio
3 cisco-collab-control
4 cisco-collab-video
5 cisco-collaboration
6 cisco-jabber-control
7 cisco-jabber-im
8 cisco-phone-audio
9 cisco-spark
10 cisco-spark-media
11 citrix
12 conference-server
13 crashplan
14 db-service
15 dns
16 exchange
17 google-services
18 gtalk-video
19 http
20 http-local-net
21 internet-audio-streaming
22 ipsec
23 mongo
24 mpeg2-ts
25 nfs
26 ntp
27 ping
28 rtcp
29 rtp
30 rtp-audio
31 rtp-video
32 samsung
33 skype
34 snmp
35 sqlserver
36 ssh
37 ssl
38 ssl-local-net
39 statistical-conf-audio
40 statistical-conf-video
41 statistical-download
42 statistical-p2p
43 stun-nat
44 sunrpc
45 syslog
46 telepresence-audio
47 telepresence-control
48 telepresence-media
49 telnet
50 unknown
51 vnc
52 web-rtc
53 webex-app-sharing
54 webex-media
55 webex-meeting

Display packets where packet in flow > 100

# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
pkts.where(pkts['pkt_in_flow']>100).orderBy('pkt_in_flow', ascending=False).limit(10).toPandas()
pkt_num ts_start protocol src_ip dst_ip src_port dst_port size in_if out_if app cls_state pkt_in_flow nbar_in_cycles nbar_out_cycles num_subcls num_exfld ip_ver direction vrf
0 3553 12197251769463 TCP 10.42.23.18 10.42.140.18 54950 5070 32 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 cisco-collab-control Final 21020 -1 16106 0 0 IPV4 Output 0
1 5189 12197263430726 TCP 173.36.65.148 10.42.23.14 53987 5611 32 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 cisco-collab-control Final 1542 -1 16293 0 0 IPV4 Output 0
2 986 12197234262861 TCP 171.70.146.222 10.41.63.42 5060 51561 718 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 cisco-collab-control Final 300 -1 13626 0 0 IPV4 Output 0
3 911 12197233724501 TCP 10.41.63.42 171.70.146.222 51561 5060 32 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collab-control Final 300 34186 16106 0 0 IPV4 Input 0
4 844 12197233217678 TCP 10.41.63.42 171.70.146.222 51561 5060 32 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collab-control Final 299 33706 16920 0 0 IPV4 Input 0
5 889 12197233514408 TCP 171.70.146.222 10.41.63.42 5060 51561 377 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 cisco-collab-control Final 299 -1 17360 0 0 IPV4 Output 0
6 624 12197232065971 TCP 10.41.63.42 171.70.146.222 51561 5060 1042 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collab-control Final 298 877600 15106 0 0 IPV4 Input 0
7 5444 12197265071106 TCP 10.42.23.12 10.118.41.24 5060 46865 32 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 cisco-collab-control Final 295 -1 16626 0 0 IPV4 Output 0
8 4328 12197258052563 TCP 10.42.23.12 10.118.41.24 5060 46865 32 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 cisco-collab-control Final 295 -1 16040 0 0 IPV4 Output 0
9 8128 12197281778071 TCP 173.36.131.162 10.41.62.36 5060 49203 400 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 cisco-collab-control Final 288 -1 15653 0 0 IPV4 Output 0

Display for each application the max non bypass packet number

# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
pkts.filter(pkts['pkt_in_flow']>0).orderBy('pkt_in_flow', ascending=False).groupBy('app').agg(F.max('pkt_in_flow').alias('max non-bypass pkts')).toPandas()
app max non-bypass pkts
0 db-service 1
1 nfs 1
2 telepresence-media 1
3 cisco-spark-media 40
4 unknown 24
5 dns 4
6 skype 1
7 http 2
8 stun-nat 42
9 cisco-collab-control 21020
10 conference-server 1
11 ntp 1
12 telepresence-control 8
13 cisco-jabber-control 1
14 ssl-local-net 1
15 cisco-collaboration 40

Display for each application the 2 packets with max non-bypass packets

Using a window to generate the result

# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
# Show NUM_PER_APP of packets with max packets_in_flow where pkt_in_flow > MIN_PKT_FLOW
NUM_PER_APP = 2
MIN_PKT_FLOW = 10
window = Window.partitionBy(pkts['app']).orderBy(pkts['pkt_in_flow'].desc())
pkts.where(pkts['pkt_in_flow']>MIN_PKT_FLOW).orderBy('app').select(F.col('*'), F.row_number().over(window) \
            .alias('row_number')).where(F.col('row_number') <= NUM_PER_APP).drop('row_number').limit(20).toPandas()
pkt_num ts_start protocol src_ip dst_ip src_port dst_port size in_if out_if app cls_state pkt_in_flow nbar_in_cycles nbar_out_cycles num_subcls num_exfld ip_ver direction vrf
0 3553 12197251769463 TCP 10.42.23.18 10.42.140.18 54950 5070 32 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 cisco-collab-control Final 21020 -1 16106 0 0 IPV4 Output 0
1 5189 12197263430726 TCP 173.36.65.148 10.42.23.14 53987 5611 32 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 cisco-collab-control Final 1542 -1 16293 0 0 IPV4 Output 0
2 2991 12197247032243 UDP 10.35.150.17 10.92.110.178 5004 51760 80 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Not final 40 184160 16386 0 0 IPV4 Input 0
3 2935 12197246781996 UDP 10.92.110.178 10.35.150.17 51760 5004 124 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Not final 39 184213 17360 0 0 IPV4 Input 0
4 2993 12197247045308 UDP 10.35.150.17 10.92.110.178 33434 51761 80 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-spark-media Not final 40 171306 16413 0 0 IPV4 Input 0
5 2944 12197246841036 UDP 10.92.110.178 10.35.150.17 51761 33434 124 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-spark-media Not final 39 184533 15133 0 0 IPV4 Input 0
6 3018 12197247153461 TCP 10.35.150.17 10.92.110.178 33434 57257 94 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 stun-nat Not final 42 182120 17133 0 0 IPV4 Input 0
7 1050 12197234576968 TCP 10.92.110.178 10.35.150.17 57255 33434 20 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 stun-nat Not final 42 27773 16813 0 0 IPV4 Input 0
8 3136 12197248128088 TCP 10.42.23.133 10.42.140.200 5061 39528 20 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 unknown Not final 24 -1 16413 0 0 IPV4 Output 0

Plot the packet size, NBAR2 in/out cycles histograms

Generate the histogram and bring the result to the driver to plot the histogram

# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
# Function to plot an histogram of a dataframe column
# The histogram heavy lifting is done using spark DF, the result is plotted as a bar chart
def plot_hist(df, col, nbins=50, range=None, title="", xlabel="Bin", ylabel="Count"):
    if range==None:
        maxp = df.agg(F.max(col).alias('maxhist')).collect()[0]['maxhist']
        step = 1.0*maxp/nbins
        bins = np.arange(0,maxp+step,step)
    else:
        step = 1.0*(range[1]-range[0])/nbins
        bins = np.arange(range[0]-step, range[1]+step, step)
    histogram = df.select(col).rdd.flatMap(lambda x: x).histogram(list(bins))
    fig, ax = fig, ax = plt.subplots(figsize=(7, 5))
    plt.bar(histogram[0][:-1], histogram[1], align='center', width=step, color='steelblue', edgecolor=['k']*len(histogram[1]))
    ax.xaxis.grid(True, linestyle=':', linewidth=0.5)
    ax.yaxis.grid(True, linestyle=':', linewidth=0.5)
    #locs, labs = plt.xticks()
    #plt.xticks(locs[::nbins/10], labs[::nbins/10], rotation=0)
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.tight_layout()
    
plot_hist(pkts, 'size', nbins=50, title="Packet Size Histogram", xlabel="Packet Size [B]", ylabel="Count")
plot_hist(pkts, 'nbar_in_cycles', nbins=50, title="Input Cycles Histogram", xlabel="Cycles [ns]", ylabel="Count")
plot_hist(pkts, 'nbar_out_cycles', nbins=50, title="Output Cycles Histogram", xlabel="Cycles [ns]", ylabel="Count")

Add the flow each packet belongs to

This is done using a UDF function

# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
# normalized flow for each packet
def normalize_flow(src_ip, dst_ip, src_port, dst_port, prot, vrf):
    if src_ip > dst_ip:
        return [(src_ip, dst_ip, src_port, dst_port, prot, vrf)]
    else:
        return [(dst_ip, src_ip, dst_port, src_port, prot, vrf)]
nflow_schema = ArrayType(StructType([
    StructField("nsrc_ip", StringType(), False),
    StructField("ndst_ip", StringType(), False),
    StructField("nsrc_port", StringType(), False),
    StructField("ndst_port", StringType(), False),
    StructField("nprot", StringType(), False),
    StructField("nvrf", IntegerType(), False)
]))    
normalized_flow_udf = F.udf(normalize_flow, nflow_schema)

pkts_f = pkts.withColumn('norm-flow', normalized_flow_udf('src_ip', 'dst_ip', 'src_port', 'dst_port', 'protocol', 'vrf'))
# Assign a unique incremental number for each flow
nf_window = Window.partitionBy().orderBy('norm-flow')
norm_flow_id = pkts_f.select('norm-flow').dropDuplicates().select(F.col('*'), (F.row_number().over(nf_window) -1).alias('FID'))
# Assign the flow ID to each packet
pkts_fid = pkts_f.join(norm_flow_id, 'norm-flow').drop('norm-flow')
pkts_fid.limit(10).toPandas()
pkt_num ts_start protocol src_ip dst_ip src_port dst_port size in_if out_if app cls_state pkt_in_flow nbar_in_cycles nbar_out_cycles num_subcls num_exfld ip_ver direction vrf FID
0 120 12197228558373 UDP 10.42.23.31 66.163.34.139 51413 48633 114 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 conference-server Final -1 -1 13466 0 0 IPV4 Output 0 864
1 146 12197228760866 UDP 10.35.150.49 146.20.193.111 34436 5004 172 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 18306 14360 0 0 IPV4 Input 0 605
2 155 12197228792661 TCP 10.35.150.16 10.24.21.177 5006 59719 32 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 stun-nat Final -1 19360 15160 0 0 IPV4 Input 0 27
3 192 12197228986481 UDP 173.38.197.51 10.42.23.32 42340 57742 1163 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 telepresence-media Final -1 -1 15733 0 0 IPV4 Output 0 768
4 316 12197229831953 UDP 128.107.82.105 10.35.209.92 50000 59032 190 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 telepresence-audio Final -1 -1 15720 0 0 IPV4 Output 0 484
5 358 12197230065206 UDP 10.35.209.92 128.107.82.105 50164 50000 1302 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 telepresence-media Final -1 -1 17053 0 0 IPV4 Output 0 476
6 375 12197230150043 UDP 128.107.82.105 10.35.150.26 50000 56840 190 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 cisco-collaboration Final -1 -1 17253 0 0 IPV4 Output 0 465
7 450 12197230566818 UDP 10.22.76.42 10.35.150.49 52212 5006 1324 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 statistical-conf-audio Final -1 17493 14146 0 0 IPV4 Input 0 80
8 488 12197230905786 UDP 10.42.23.32 128.107.83.85 55070 36010 987 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 telepresence-media Final -1 -1 14680 0 0 IPV4 Output 0 595
9 674 12197232330151 UDP 128.107.201.136 128.107.82.105 57522 50000 1340 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 telepresence-media Final -1 -1 15653 0 0 IPV4 Output 0 503

Display all packets belong to flow number 0

# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
pkts_fid.where(pkts_fid['FID'] == 0).toPandas()
pkt_num ts_start protocol src_ip dst_ip src_port dst_port size in_if out_if app cls_state pkt_in_flow nbar_in_cycles nbar_out_cycles num_subcls num_exfld ip_ver direction vrf FID
0 4319 12197257994418 TCP 10.122.72.87 10.32.134.106 695 2049 248 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 nfs Final -1 18306 15053 0 0 IPV4 Input 0 0
1 4337 12197258152738 TCP 10.32.134.106 10.122.72.87 2049 695 32 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 nfs Final -1 18440 16013 0 0 IPV4 Input 0 0
2 7637 12197278624433 TCP 10.122.72.87 10.32.134.106 695 2049 156 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 nfs Final -1 17426 14626 0 0 IPV4 Input 0 0
3 4485 12197259089056 TCP 10.32.134.106 10.122.72.87 2049 695 196 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 nfs Final -1 18040 15173 0 0 IPV4 Input 0 0
4 7686 12197278785581 TCP 10.32.134.106 10.122.72.87 2049 695 32 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 nfs Final -1 18053 14386 0 0 IPV4 Input 0 0
5 7698 12197278844508 TCP 10.32.134.106 10.122.72.87 2049 695 272 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 nfs Final -1 19013 15013 0 0 IPV4 Input 0 0

Calculate the number of packets, Inter Packet Gap (IPG) and direction for each packet in a flow

# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
# For each flow calculate, number of packets, pkt in flow, is_initiator, IPG
fp_window = Window.partitionBy('FID').orderBy('ts_start')
pkts_flow = pkts_fid.select(F.col('*'),
            F.row_number().over(fp_window).alias('pkt_num_in_flow'), 
            ((pkts_fid['ts_start'] - F.lag(pkts_fid['ts_start']).over(fp_window))).alias('IPG'),
            ((pkts_fid['src_ip']==F.first(pkts_fid['src_ip']).over(fp_window)) & (pkts_fid['src_port']==F.first('src_port').over(fp_window))).alias('Initiator')) \
            .cache()
pkts_flow.limit(20).toPandas()
pkt_num ts_start protocol src_ip dst_ip src_port dst_port size in_if out_if app cls_state pkt_in_flow nbar_in_cycles nbar_out_cycles num_subcls num_exfld ip_ver direction vrf FID pkt_num_in_flow IPG Initiator
0 2242 12197242005911 TCP 10.19.195.198 10.35.209.71 44414 443 32 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 ssl-local-net Final -1 -1 14520 0 0 IPV4 Output 0 148 1 NaN True
1 3131 12197248087698 TCP 10.19.195.198 10.35.209.71 44414 443 179 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 ssl-local-net Final -1 -1 16720 0 0 IPV4 Output 0 148 2 6081787.0 True
2 3155 12197248236108 TCP 10.35.209.71 10.19.195.198 443 44414 32 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 ssl-local-net Final -1 -1 16346 0 0 IPV4 Output 0 148 3 148410.0 False
3 3965 12197255391233 TCP 10.35.209.71 10.19.195.198 443 44414 32 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 ssl-local-net Final -1 -1 16866 0 0 IPV4 Output 0 148 4 7155125.0 False
4 48 12197228029678 UDP 128.107.82.105 10.35.150.26 50000 53156 95 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 cisco-collaboration Final -1 -1 15493 0 0 IPV4 Output 0 463 1 NaN True
5 81 12197228281846 UDP 10.35.150.26 128.107.82.105 53156 50000 1022 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 18720 15106 0 0 IPV4 Input 0 463 2 252168.0 False
6 101 12197228407851 UDP 10.35.150.26 128.107.82.105 53156 50000 1257 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 17133 14360 0 0 IPV4 Input 0 463 3 126005.0 False
7 479 12197230846768 UDP 10.35.150.26 128.107.82.105 53156 50000 1272 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 19173 15560 0 0 IPV4 Input 0 463 4 2438917.0 False
8 950 12197234082306 UDP 10.35.150.26 128.107.82.105 53156 50000 1288 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 17720 13720 0 0 IPV4 Input 0 463 5 3235538.0 False
9 1491 12197237408736 UDP 10.35.150.26 128.107.82.105 53156 50000 1289 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 18413 15933 0 0 IPV4 Input 0 463 6 3326430.0 False
10 1993 12197240619696 UDP 10.35.150.26 128.107.82.105 53156 50000 456 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 17413 14706 0 0 IPV4 Input 0 463 7 3210960.0 False
11 3549 12197251667853 UDP 10.35.150.26 128.107.82.105 53156 50000 1287 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 18760 15080 0 0 IPV4 Input 0 463 8 11048157.0 False
12 3995 12197255608356 UDP 10.35.150.26 128.107.82.105 53156 50000 1273 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 18826 15600 0 0 IPV4 Input 0 463 9 3940503.0 False
13 4532 12197259502148 UDP 10.35.150.26 128.107.82.105 53156 50000 1292 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 17693 13800 0 0 IPV4 Input 0 463 10 3893792.0 False
14 4702 12197260774888 UDP 128.107.82.105 10.35.150.26 50000 53156 153 TenGigabitEthernet1/1/0 TenGigabitEthernet1/0/0 cisco-collaboration Final -1 -1 13520 0 0 IPV4 Output 0 463 11 1272740.0 True
15 4777 12197261296131 UDP 10.35.150.26 128.107.82.105 53156 50000 920 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 17680 13546 0 0 IPV4 Input 0 463 12 521243.0 False
16 4802 12197261459021 UDP 10.35.150.26 128.107.82.105 53156 50000 1271 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 19120 14986 0 0 IPV4 Input 0 463 13 162890.0 False
17 5365 12197264484618 UDP 10.35.150.26 128.107.82.105 53156 50000 1290 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 18680 15573 0 0 IPV4 Input 0 463 14 3025597.0 False
18 5867 12197267775378 UDP 10.35.150.26 128.107.82.105 53156 50000 1289 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 18960 16093 0 0 IPV4 Input 0 463 15 3290760.0 False
19 6351 12197271155431 UDP 10.35.150.26 128.107.82.105 53156 50000 1290 TenGigabitEthernet1/0/0 TenGigabitEthernet1/1/0 cisco-collaboration Final -1 19173 16026 0 0 IPV4 Input 0 463 16 3380053.0 False

Plot the Inter Packet Gap (IPG) Histogram

# Show / Hide code Button
HTML('''<button class="toggle">Show/Hide Code</button>''')
plot_hist(pkts_flow, 'IPG', nbins=50, title="IPG Histogram", xlabel="IPG [ns]", ylabel="Count", range=[0,40000])

Spark UI DAG

This is the Spark UI DAG for the last histogram that shows the flow of the data. It contains three stages each with few map functions.

HTML('''<a href="./jobDAG.jpg" target="_blank"><img src="./jobDAG.jpg" 
alt="Spark Environment" width="500" height="644">
</a>Click for full size''')
Spark Environment Click for full size
# TBD: Add to nbconvert full html as a script
HTML('''
<style>
button {
    background-color: #4CAF50;
    border: none;
    color: #FFF;
    padding: 5px 20px;
    text-align: center;
    text-decoration: none;
    display: inline-block;
    font-size: 16px;
}
button:hover {
    box-shadow: 0 8px 16px 0 rgba(0,0,0,0.2), 0 6px 20px 0 rgba(0,0,0,0.19);
</style>    
    
<script>
  $( document ).ready(function(){
    $('.toggle').parent().parent().parent().parent().prev().hide();
    $('div.input').hide();
    $('.pre_code').hide();
  });
  
$('.toggle').click(function(){
    e1 = $(this).parent().parent().parent().parent().parent().next().find('.input');
    if (e1.is(":visible") == true) {
       e1.hide('500');//hide parent 
    } else {
       e1.show('500');//hide parent 
    }
    e2 = $(this).parent().parent().parent().parent().parent().next().find('.pre_code');
    if (e2.is(":visible") == true) {
       e2.hide('500');//hide parent 
    } else {
       e2.show('500');//hide parent 
    }   
});  
</script>

<div class="toggle"></div>
''')