Network Trace Analysis Using Apache Flink

This is an example of network trace analysis using Apache Flink. It is similar to the analysis done with Apache Spark except that it runs as a Flink Job over Hadoop Cluster written in Java.
The Java code is given below:


package com.bendvora.flink;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.core.fs.FileSystem;

/* 
 * Example for batch processing:
 * - reading file with delimiter
 */
public class PacketTraceAnalysis {
  
  final static String tracePathFile = "hdfs://hadoop1:54310/user/hduser/trace_analysis/nirbd_trace_nov_8.txt"; 
  final static String fileDelimiter = "Packet: "; 

  @SuppressWarnings("serial")
  public static void main(String[] args) throws Exception {
    // get the execution environment
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    final BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
    //env.setParallelism(2);
    
    DataSet jsonRaw = env.readFileOfPrimitives(tracePathFile, fileDelimiter, String.class);
    
    DataSet pkts = jsonRaw
      // Filter valid packets
      .filter(new FilterFunction() {
          @Override
          public boolean filter(String pkt) throws Exception {
              return pkt.contains("CBUG ID");
          }
      })
      // Parse packet to PktInfo
      .map(new ParsePkt());
    
    // Print first 10 packets
    pkts
      .first(10)
      .print();
    
    // Create table from packet info
    Table pktst = tEnv.fromDataSet(pkts);
    tEnv.registerTable("PaketInfo", pktst);
    //pktst.printSchema();
    
    // Print unique applications
    Table unique_apps = pktst.select("app").distinct();
    DataSet uapps = tEnv.toDataSet(unique_apps, String.class);
    uapps.print();
    
/*      // This is a different way to print unique applications using DataSet instead of Table
      pkts
      .map(new MapFunction() {
        @Override
        public String map(PktInfo pkt) throws Exception {
          return pkt.getApp();
        }
      })
      .distinct()
      .print();
*/      
    
    // Add Normalized flow to each packet
    tEnv.registerFunction("normFlow", new NormFlow());
    Table pktst2 = pktst.select("*, normFlow(src_ip, dst_ip, src_port, dst_port, protocol, vrf) AS norm_flow");
    // Print unique flows
    Table unique_flows = pktst2.select("norm_flow").distinct();
    DataSet uflows = tEnv.toDataSet(unique_flows, String.class);
    uflows.print();
    DataSet> flow_fid = uflows
        .map(new AssignFID());    
    flow_fid.print();
    
    Table flowst = tEnv.fromDataSet(flow_fid, "nflow, ffid");
    tEnv.registerTable("FlowInfo", flowst);
    
    Table pkts_fid = pktst2
        .leftOuterJoin(flowst)
        .where("norm_flow = nflow")
        .select("pkt_num, in_if, out_if, ts_start, direction, size, src_ip, dst_ip, src_port, dst_port, vrf, protocol, ip_ver, pkt_in_flow, cls_state, app, num_subcls, num_exfld, nbar_in_cycles, nbar_out_cycles, ffid")
        .as("pkt_num, in_if, out_if, ts_start, direction, size, src_ip, dst_ip, src_port, dst_port, vrf, protocol, ip_ver, pkt_in_flow, cls_state, app, num_subcls, num_exfld, nbar_in_cycles, nbar_out_cycles, fid");

    DataSet pfids = tEnv.toDataSet(pkts_fid, PktInfoCSV.class);
    pfids.writeAsText("hdfs://hadoop1:54310/user/hduser/trace_analysis/nirbd_trace_nov_8.csv", FileSystem.WriteMode.OVERWRITE).setParallelism(1); 
    
    env.execute("Packet Trace Analysis");
  }
  
  // Packet Information data element
  public static class PktInfo {

    public int pkt_num;
    public String in_if;
    public String out_if;
    public long ts_start;
    public String direction;
    public int size;
    public String src_ip;
    public String dst_ip;
    public String src_port;
    public String dst_port;
    public int vrf;
    public String protocol;
    public String ip_ver;
    public int pkt_in_flow;
    public String cls_state;
    public String app;
    public int num_subcls;
    public int num_exfld;
    public int nbar_in_cycles;
    public int nbar_out_cycles;
    public int fid;
    
    public PktInfo() {};

    public PktInfo(int pkt_num, String in_if, String out_if, long ts_start, String direction, int size,
        String src_ip, String dst_ip, String src_port, String dst_port, int vrf, String protocol, String ip_ver,
        int pkt_in_flow, String cls_state, String app, int num_subcls, int num_exfld, int nbar_in_cycles,
        int nbar_out_cycles) {
      this.pkt_num = pkt_num;
      this.in_if = in_if;
      this.out_if = out_if;
      this.ts_start = ts_start;
      this.direction = direction;
      this.size = size;
      this.src_ip = src_ip;
      this.dst_ip = dst_ip;
      this.src_port = src_port;
      this.dst_port = dst_port;
      this.vrf = vrf;
      this.protocol = protocol;
      this.ip_ver = ip_ver;
      this.pkt_in_flow = pkt_in_flow;
      this.cls_state = cls_state;
      this.app = app;
      this.num_subcls = num_subcls;
      this.num_exfld = num_exfld;
      this.nbar_in_cycles = nbar_in_cycles;
      this.nbar_out_cycles = nbar_out_cycles;
      this.fid = -1;    // FID will be calculated later
    }
    
    public String getApp() {
      return this.app;
    }
    
    @Override
    public String toString() {
      return "PktInfo [pkt_num=" + pkt_num + ", in_if=" + in_if + ", out_if=" + out_if + ", ts_start=" + ts_start
          + ", direction=" + direction + ", size=" + size + ", src_ip=" + src_ip + ", dst_ip=" + dst_ip
          + ", src_port=" + src_port + ", dst_port=" + dst_port + ", vrf=" + vrf + ", protocol=" + protocol
          + ", ip_ver=" + ip_ver + ", pkt_in_flow=" + pkt_in_flow + ", cls_state=" + cls_state + ", app="
          + app + ", num_subcls=" + num_subcls + ", num_exfld=" + num_exfld + ", nbar_in_cycles="
          + nbar_in_cycles + ", nbar_out_cycles=" + nbar_out_cycles + ", fid=" + fid + "]";
    }
  }
  
  // Packet info for CSV print
  public static class PktInfoCSV extends PktInfo {
    @Override
    public String toString() {
      return pkt_num + "," + in_if + "," + out_if + "," + ts_start
          + "," + direction + "," + size + "," + src_ip + "," + dst_ip
          + "," + src_port + "," + dst_port + "," + vrf + "," + protocol
          + "," + ip_ver + "," + pkt_in_flow + "," + cls_state + ","
          + app + "," + num_subcls + "," + num_exfld + ","
          + nbar_in_cycles + "," + nbar_out_cycles + "," + fid;
    }   
  }
  
  // Packet parser. Get each string packet and return the packet info structure
  public static class ParsePkt implements MapFunction {
    private static final long serialVersionUID = 1L;

    Pattern pktnum_match = Pattern.compile("^(\\d+)\\s+CBUG ID");
    Pattern if_match     = Pattern.compile("Summary\n  Input     : (.*)\n  Output    : (.*)\n  State     :.*\n  Timestamp\n    Start   : (\\d+) ns");
    Pattern cft_match    = Pattern.compile("  Feature: CFT\n    API.*\n.*\n.*\n.*\n    direction             : (.*)\n.*\n.*\n.*\n.*\n.*\n    cft_l3_payload_size   : (\\d+)\n.*\n.*\n    tuple.src_ip          : (.*)\n    tuple.dst_ip          : (.*)\n    tuple.src_port        : (\\d+)\n    tuple.dst_port        : (\\d+)\n    tuple.vrfid           : (\\d+)\n    tuple.l4_protocol     : (.*)\n    tuple.l3_protocol     : (.*)\n");
    Pattern nbar_match   = Pattern.compile("  Feature: NBAR\n    Packet number in flow: (.*)\n    Classification state: (.*)\n    Classification name: (.*)\n    Classification ID: .*\n    Number of matched sub-classifications: (\\d+)\n    Number of extracted fields: (\\d+)\n");
    Pattern cycin_match  = Pattern.compile("IPV4_INPUT_STILE_LEGACY\n    Lapsed time : (\\d+) ns");
    Pattern cycout_match = Pattern.compile("IPV4_OUTPUT_STILE_LEGACY\n    Lapsed time : (\\d+) ns");

    @Override
    public PktInfo map(String pkt) throws Exception {
      Matcher m;
      int pkt_num = -1;
      String in_if = "ERR";
      String out_if = "ERR";
      long ts_start = -1;
        String direction = "ERR";
        int size = -1;
        String src_ip = "ERR";
        String dst_ip = "ERR";
        String src_port = "ERR";
        String dst_port = "ERR";
        int vrf = -1;
        String protocol = "ERR";
          String ip_ver = "ERR";
          int pkt_in_flow = -1;
          String cls_state = "ERR";
          String app = "ERR";
          int num_subcls = -1;
          int num_exfld = -1;
          int nbar_in_cycles = -1;
          int nbar_out_cycles = -1;
      
      m = pktnum_match.matcher(pkt);
      if (m.find()) pkt_num = Integer.parseInt(m.group(1));
      
      m = if_match.matcher(pkt);
          if (m.find()) {
              in_if = m.group(1);
              out_if = m.group(2);
              ts_start = Long.parseLong(m.group(3));
          } 
          
          m = cft_match.matcher(pkt);
          if (m.find()) {
              direction = m.group(1);
              size = Integer.parseInt(m.group(2));
              src_ip = m.group(3);
              dst_ip = m.group(4);
              src_port = m.group(5);
              dst_port = m.group(6);
              vrf = Integer.parseInt(m.group(7));
              protocol = m.group(8);
              ip_ver = m.group(9);
          }
          
          m = nbar_match.matcher(pkt);
          if (m.find()) {
              if (!m.group(1).equals("N/A")) pkt_in_flow = Integer.parseInt(m.group(1));
                cls_state = m.group(2);
              app = m.group(3);
              num_subcls = Integer.parseInt(m.group(4));
              num_exfld = Integer.parseInt(m.group(5));
          }
          
          m = cycin_match.matcher(pkt);
          if (m.find()) nbar_in_cycles = Integer.parseInt(m.group(1));
          
          m = cycout_match.matcher(pkt);
          if (m.find()) nbar_out_cycles = Integer.parseInt(m.group(1));
          
          return new PktInfo(pkt_num, in_if, out_if, ts_start, direction, size, src_ip, dst_ip, src_port, dst_port,
                  vrf, protocol, ip_ver, pkt_in_flow, cls_state, app, num_subcls, num_exfld, nbar_in_cycles, nbar_out_cycles);          
    }
  } 
  
  // Helper function to normalize flow. Return string based on each flow  
  public static class NormFlow extends ScalarFunction {
    private static final long serialVersionUID = 1L;
    public NormFlow() {};
        public String eval(String src_ip, String dst_ip, String src_port, String dst_port, String protocol, int vrf) {
        if (src_ip.compareTo(dst_ip) >=0) {
          return src_ip + "," + dst_ip + "," + src_port + "," + dst_port + "," + protocol + "," + Integer.toString(vrf);
        } else {
          return dst_ip + "," + src_ip + "," + dst_port + "," + src_port + "," + protocol + "," + Integer.toString(vrf);
        }
      }
  } 
  
  // Helper function to assign flow ID, run in parallel system
  public static class AssignFID extends RichMapFunction> {
    private static final long serialVersionUID = 1L;
    private long parallelism;
      private int idCounter;

      public void open(Configuration parameters) {
          RuntimeContext ctx = getRuntimeContext();
          parallelism = ctx.getNumberOfParallelSubtasks();
          idCounter = ctx.getIndexOfThisSubtask();
      }

    @Override
    public Tuple2 map(String nflow) {
        Tuple2 output = new Tuple2(nflow, idCounter);
          idCounter += parallelism;
          return output;
      }
  }   
}  

The Flink UI Job Graph for the code above is given below:

Flink Job Graph click for full size