3.1.9.1. Machine Learning Example#

3.1.9.1.1. Network Metrics Database ML#

You can write your ML code in python2 or python3 or SQL.

3.1.9.1.1.1. ML SQL Arima Example#

Example Arima (Apache Madlib) ML Code -



CREATE OR REPLACE PROCEDURE tcp_payload_forecast_5min(
   cap_limit integer DEFAULT 100000,
   miter integer DEFAULT 10,
   tbucket INTERVAL DEFAULT '5 seconds',
   cinterval INTERVAL DEFAULT '60 minutes'
)
LANGUAGE SQL
AS $$
   SET client_min_messages TO WARNING;
   DROP TABLE IF EXISTS tcp_pkt_compute_5min;
   DROP TABLE IF EXISTS tcp_pkt_compute_5min_forecast_output;
   CREATE TEMP TABLE tcp_pkt_compute_5min AS
     SELECT * FROM ( SELECT time_bucket( tbucket, cap_tstamp) as "time",
                     CAST(data->>'tcp_payload_len' AS INTEGER) as "value"
                     FROM L4_metric
                     WHERE data @> '{"ip_proto": "TCP"}'
                     AND cap_tstamp >= now() - cinterval
                     LIMIT cap_limit ) q;

  DROP TABLE IF EXISTS tcp_pkt_compute_output;
  DROP TABLE IF EXISTS tcp_pkt_compute_forecast_output;
  DROP TABLE IF EXISTS tcp_pkt_compute_5min_output;
  DROP TABLE IF EXISTS tcp_pkt_compute_5min_output_summary;
  DROP TABLE IF EXISTS tcp_pkt_compute_5min_output_residual;

  SELECT madlib.arima_train('tcp_pkt_compute_5min',
                            'tcp_pkt_compute_5min_output',
                            'time',
                            'value',
                            NULL,
                            TRUE,
                            ARRAY[1, 1, 1]
                           );
  DROP TABLE IF EXISTS tcp_pkt_compute_5min_forecast_output;
  SELECT madlib.arima_forecast('tcp_pkt_compute_5min_output',
                               'tcp_pkt_compute_5min_forecast_output', miter);
$$;


-- dummy table
CREATE TABLE tcp_pkt_compute_5min_forecast_output(id int, steps_ahead int, forecast_value decimal);

CREATE OR REPLACE PROCEDURE tcp_payload_forecast_5min_copy()
LANGUAGE SQL
AS $$
  SET client_min_messages TO WARNING;

  INSERT INTO tcp_pkt_predict_arima_5min(step,value,tstamp)
    SELECT steps_ahead,forecast_value,now() as tstamp FROM tcp_pkt_compute_5min_forecast_output;

  DROP TABLE IF EXISTS tcp_pkt_compute_output;
  DROP TABLE IF EXISTS tcp_pkt_compute_forecast_output;
  DROP TABLE IF EXISTS tcp_pkt_compute_5min_output;
  DROP TABLE IF EXISTS tcp_pkt_compute_5min_output_summary;
  DROP TABLE IF EXISTS tcp_pkt_compute_5min_output_residual;
  DROP TABLE IF EXISTS tcp_pkt_compute_5min_forecast_output;
$$;


select cron.schedule('*/5 * * * *',
 $$CALL tcp_payload_forecast_5min(100000,10,'5 seconds','60 minutes'); CALL tcp_payload_forecast_5min_copy();$$);

3.1.9.1.1.2. ML SQL Arima Decision Tree Example#

Example Arima Decision Tree ML Code



CREATE OR REPLACE PROCEDURE tcp_flags_decision_tree5min(
   cap_limit integer DEFAULT 100000,
   tbucket INTERVAL DEFAULT '5 seconds',
   cinterval INTERVAL DEFAULT '60 minutes'
)
LANGUAGE SQL
AS $$
   SET client_min_messages TO WARNING;
   DROP TABLE IF EXISTS ref_tcp_dtree_prediction_results;
   DELETE FROM metrics_ref_tcp_dtree;
   INSERT INTO metrics_ref_tcp_dtree
       SELECT time_bucket( tbucket, cap_tstamp) as "time",
         convo_id as "convo_id",
         data#>> '{layers,ip,src}' as "src_host",
         CAST(data#>> '{layers,tcp,srcport}' AS INTEGER) as "src_port",
         data#>> '{layers,ip,dst}' as "dst_host",
         CAST(data#>> '{layers,tcp,dstport}' AS INTEGER) as "dst_port",
         ('x' || right(data#>>'{layers,ip,flags}',2))::bit(8)::int as "ip_flags",
         (data#>> '{layers,ip,flags_df}')::boolean as "ip_df",
         (data#>> '{layers,ip,flags_mf}')::boolean as "ip_mf",
         CAST(data#>> '{layers,ip,len}' AS INTEGER) as "ip_len",
         CAST(data#>> '{layers,ip,hdr_len}'AS INTEGER)  as "ip_hlen",
         CAST(data#>> '{layers,tcp,len}'AS INTEGER)  as "tcp_plen",
         CAST(data#>> '{layers,tcp,hdr_len}'AS INTEGER)  as "tcp_hlen",
         data#>> '{layers,tcp,text}' as "tcp_text",
         data#>> '{layers,tcp,flags_str}' as "tcp_flags_text",
         ('x' || right(data#>>'{layers,tcp,flags}',4))::bit(16)::int as "tcp_flags",
         (data#>> '{layers,tcp,flags_ns}')::boolean as "tcp_fl_ns",
         (data#>> '{layers,tcp,flags_ack}')::boolean as "tcp_fl_ack",
         (data#>> '{layers,tcp,flags_cwr}')::boolean as "tcp_fl_cwr",
         (data#>> '{layers,tcp,flags_ecn}')::boolean as "tcp_fl_ecn",
         (data#>> '{layers,tcp,flags_fin}')::boolean as "tcp_fl_fin",
         (data#>> '{layers,tcp,flags_res}')::boolean as "tcp_fl_res",
         (data#>> '{layers,tcp,flags_syn}')::boolean as "tcp_fl_syn",
         (data#>> '{layers,tcp,flags_urg}')::boolean as "tcp_fl_urg",
         (data#>> '{layers,tcp,flags_push}')::boolean as "tcp_fl_push",
         (data#>> '{layers,tcp,flags_reset}')::boolean as "tcp_fl_rest",
         CAST(data#>> '{layers,tcp,_ws_expert,group}' AS INTEGER) as "tcp_exp_group",
         CAST(data#>> '{layers,tcp,_ws_expert,severity}' AS INTEGER) as "tcp_exp_severity"
       FROM wire_metric
       WHERE data @> '{"layers": {"tcp": {}}}' AND cap_tstamp >= now() - cinterval
       LIMIT cap_limit;
   --
   SELECT madlib.tree_predict('ref_tcp_dtree_train_output',       -- tree model
                              'metrics_ref_tcp_dtree',            -- new data table
                              'ref_tcp_dtree_prediction_results', -- output table
                              'prob');
$$;



CREATE OR REPLACE FUNCTION tcp_flags_decision_tree5min_copy()
 RETURNS INTEGER
AS $$
   qry_dt = "INSERT INTO ml_metric_detection_data(convo_id,src_host,src_port,dst_host,dst_port,class,observation_weight,tcp_flags,alert,ok,ml_name,cycle,ml_algo,ml_features,ml_output,ml_tree_model,ml_output_table,tstamp) SELECT g.convo_id, g.src_host,g.src_port,g.dst_host,g.dst_port, class,observation_weight,g.tcp_flags_text, p.\"estimated_prob_ALERT\" as \"alert\", p.\"estimated_prob_OK\" as \"ok\",\'flags spec decision tree\' as ml_name,\'5min\' as cycle,\'decision tree\' as ml_algo,\'tcp_plen,tcp_hlen,tcp_flags,tcp_exp_group,tcp_exp_severity\' as ml_features,\'{}\' as ml_output,\'ref_tcp_dtree_train_output\' as ml_tree_model,\'ref_tcp_dtree_prediction_results\' as ml_output_table,now() as tstamp FROM ref_tcp_dtree_prediction_results p, metrics_ref_tcp_dtree g where p.convo_id = g.convo_id AND p.\"estimated_prob_ALERT\" = 1 ORDER BY g.convo_id;"
   rv = plpy.execute( qry_dt )
   return 1;
$$ LANGUAGE plpython2u;


-- for the decision tree ml job
select cron.schedule('*/5 * * * *',
    $$CALL tcp_flags_decision_tree5min(100000,'5 seconds','60 minutes');SELECT tcp_flags_decision_tree5min_copy();$$);


3.1.9.1.1.3. ML SQL Test Tensorflow#

Example Tensorflow ML Code -


CREATE OR REPLACE FUNCTION test_tf(
   cap_limit integer DEFAULT 10000, miter integer DEFAULT 10,
   tbucket INTERVAL DEFAULT '5 seconds', cinterval INTERVAL DEFAULT '60 minutes'
)
 RETURNS INTEGER
AS $$
   import numpy as np;
   import pandas as pd;
   import tensorflow as tf;

   qry = "SELECT * FROM ( SELECT time_bucket( '5 seconds', cap_tstamp) as time, CONCAT_WS('',data->>'src_host',':',data->'src_port','-',data->>'dst_host', ':',data->'dst_port') as host_con, cast(data->>'tcp_flags' AS INTEGER) as flags, cast(data->>'tcp_hdr_len' AS INTEGER) as hdr_len, cast(data->>'tcp_payload_len' AS INTEGER) as tcp_len, data->>'tcp_flag_string' as flags_str FROM L4_metric WHERE data @> '{\"ip_proto\": \"TCP\"}' AND cap_tstamp >= now() - INTERVAL '30 minutes' LIMIT 100000 ) q;"
   plpy.log("qry: {}".format(qry))
   dataset = []
   for row in plpy.cursor( qry ):
       dataset.append( row)
   dataset = pd.DataFrame( dataset )
   dataset_features = dataset.copy()
   dataset_labels = dataset_features.pop('flags')
   plpy.log("dataset: {}".format(dataset))
   inputs = {}
   for name, col in dataset_features.items():
       plpy.log(" name '{}' col '{}'".format(name,col))
       dtype = col.dtype
       if dtype == object:
          dtype = tf.string
       else:
          dtype = tf.float32
       inputs[name] = tf.keras.Input(shape=(1,), name=name, dtype=dtype)
   plpy.log(" inputs {}".format(inputs))
   numeric_inputs = {name:input for name, input in inputs.items()
                     if input.dtype==tf.float32}
   x = tf.keras.layers.Concatenate()(list(numeric_inputs.values()))
   norm = tf.keras.layers.experimental.preprocessing.Normalization()
   norm.adapt(np.array(dataset[numeric_inputs.keys()]))
   all_numeric_inputs = norm(x)
   plpy.log(" numeric_inputs {}".format(all_numeric_inputs))
   preprocessed_inputs = [all_numeric_inputs]
   for name, input in inputs.items():
       if input.dtype == tf.float32:
          continue
       lookup = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=np.unique(dataset_features[name]))
       one_hot = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=lookup.vocab_size())
       x = lookup(input)
       x = one_hot(x)
       preprocessed_inputs.append(x)

   preprocessed_inputs_cat = tf.keras.layers.Concatenate()(preprocessed_inputs)
   bits_preprocessing = tf.keras.Model(inputs, preprocessed_inputs_cat)

   dataset_features_dict = {name: np.array(value) 
                            for name, value in dataset_features.items()}
   dataset_dict = {name:values[:1] for name, values in dataset_features_dict.items()}
   bits_preprocessing(dataset_dict)
   plpy.log(" bits_dataset {}".format(bits_preprocessing))
   def makemodel(preprocessing_head, inputs):
       body = tf.keras.Sequential([
                   tf.keras.layers.Dense(64),
                   tf.keras.layers.Dense(1) ])

       preprocessed_inputs = preprocessing_head(inputs)
       result = body(preprocessed_inputs)
       model = tf.keras.Model(inputs, result)

       model.compile(loss=tf.losses.BinaryCrossentropy(from_logits=True),
                     optimizer=tf.optimizers.Adam())
       return model

   bits_model = makemodel(bits_preprocessing, inputs)
   bits_model.fit(x=dataset_features_dict, y=dataset_labels, epochs=10)

   return 1; 
$$ LANGUAGE plpython3u;

3.1.9.1.1.4. ML PYthon Example Code#

Example Tensorflow Python Code -


import numpy as np
import pandas as pd
import tensorflow as tf
#from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
import math
#

class DataToTfModel(object):
      def __init__(self, plpy, qry):
          super(object,self).__init__()
          self.plpy = plpy
          self.qry = qry
          self.dataset = []
          self.dataset_features = None
          self.dataset_labels = None
          self.dataset_features_dict = None
          self.dataset_dict = None
          self.inputs = {}
          self.bits_preprocessing = None
      def get_dataset(self, labels='flags'):
          for row in self.plpy.cursor(self.qry):
              self.dataset.append(row) # populate the dataset with rows from db
          self.dataset = pd.DataFrame(self.dataset) # push into a pandas dataframe
          self.dataset_features = self.dataset.copy()
          self.dataset_labels = self.dataset_features.pop( labels )
      def map_to_keras_inputs(self):
          for name, col in self.dataset_features.items():
              self.plpy.log(" name '{}' col '{}'".format(name,col))
              dtype = col.dtype
              if dtype == object:
                 dtype = tf.string
              else:
                 dtype = tf.float32
              self.inputs[name] = tf.keras.Input(shape=(1,), name=name, dtype=dtype)
              self.plpy.log(" inputs {}".format(self.inputs))
      def fix_inputs(self):
          # 1st step concentrate the numeric inputs together and run them
          # though a normalization layer
          numeric_inputs = {name:input for name, input in self.inputs.items()
                            if input.dtype==tf.float32}
          x = tf.keras.layers.Concatenate()(list(numeric_inputs.values()))
          norm = tf.keras.layers.experimental.preprocessing.Normalization()
          norm.adapt(np.array(self.dataset[numeric_inputs.keys()]))
          all_numeric_inputs = norm(x)
          self.plpy.log(" numeric_inputs {}".format(all_numeric_inputs))
          return all_numeric_inputs
      def fix_symbol_processing(self, all_numeric_inputs):
          # collect all the symbolic preprocessing results, to concatenate them together
          preprocessed_inputs = [all_numeric_inputs]
          for name, input in self.inputs.items():
              if input.dtype == tf.float32:
                 continue
              lookup = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=np.unique(self.dataset_features[name]))
              one_hot = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=lookup.vocab_size())
              x = lookup(input)
              x = one_hot(x)
              preprocessed_inputs.append(x)
          # concatenate all the preprocessed inputs together and build a model that
          # handles the preprocessing
          preprocessed_inputs_cat = tf.keras.layers.Concatenate()(preprocessed_inputs)
          self.bits_preprocessing = tf.keras.Model(self.inputs, preprocessed_inputs_cat)
          # convert to a dictionary of tensors
          self.dataset_features_dict = {name: np.array(value)
                                        for name, value in self.dataset_features.items()}
      def slice_training_example(self):
          # slice out our first training example and pass it to preprocessing model
          # the numberic ones and strings ones are all concatenated together
          self.dataset_dict = {name:values[:1] for name, values in self.dataset_features_dict.items()}
          self.bits_preprocessing(self.dataset_dict)
          self.plpy.log(" bits_dataset {}".format(self.bits_preprocessing))
      def makemodel(self, preprocessing_head, inputs):
          # build the model on top of this
          body = tf.keras.Sequential([ tf.keras.layers.Dense(64), tf.keras.layers.Dense(1) ])

          preprocessed_inputs = preprocessing_head(self.inputs)
          result = body(preprocessed_inputs)
          model = tf.keras.Model(self.inputs, result)

          model.compile(loss=tf.losses.BinaryCrossentropy(from_logits=True),
                        optimizer=tf.optimizers.Adam())
          return model
#
#
MAX_TIMESTEPS = 128
TRAINING_SIZE_PERCENTAGE = 0.65  # 65 percent of data will be test size
DEFAULT_ONE = 1
DEFAULT_ZERO = 0
class DataToTfModelLTSM(object):
      def __init__(self, plpy, frange, qry):
          super(object,self).__init__()
          self.plpy = plpy
          self.qry = qry
          self.dataset = []
          self.dataset_dict = None
          self.scaler = None
          self.scaler = MinMaxScaler(feature_range=frange)
          self.train_X = None
          self.train_y = None
          self.max_time_steps = MAX_TIMESTEPS
          self.training_size_percent = TRAINING_SIZE_PERCENTAGE
          self.default_1 = 1
          self.default_0 = 0
      def set_max_time_steps(self, value=120):
          self.max_time_steps = value
      def set_training_size_percentage(self, value=0.65):
          self.training_size_percent = value
      def set_default_one(self, value=1):
          self.default_1 = value
      def set_default_zero(self, value=0):
          self.default_0 = value
      def create_dataset(self, dataset, time_step=1):
          dX, dY = [], []
          for i in range(len(dataset) - time_step - self.default_1):
              a = dataset[i:(i+time_step), self.default_0]   ## i=0, 0,1,2,3 etc
              dX.append(a)
              dY.append(dataset[i + time_step, self.default_0])
          return np.array(dX), np.array(dY)
      def get_dataset(self, labels='flags'):
          for row in self.plpy.cursor(self.qry):
              self.dataset.append(row) # populate the dataset with rows from db
          self.dataset = pd.DataFrame(self.dataset) # push into a pandas dataframe
      def scale_dataset(self, dataset, Label='flags', time_step=100):
          # scale datset
          self.scaled_data = self.scaler.fit_transform(
                                  dataset[Label].values.reshape(-1,1)
                             )
          self.scaled_dlen = len(self.scaled_data)
      def create_test_train_dataset(self, time_step=100):
          # create dataset
          training_size = int(self.scaled_dlen * TRAINING_SIZE_PERCENTAGE)
          test_size = self.scaled_dlen - training_size
          train_data = self.scaled_data[0:training_size,:]
          test_data = self.scaled_data[training_size:self.scaled_dlen,:1]
          #
          self.X_train, self.y_train = self.create_dataset(train_data, time_step)
          self.X_test, self.y_test = self.create_dataset(test_data, time_step)
          self.X_train = self.X_train.reshape(self.X_train.shape[0], self.X_train.shape[1], 1)
          self.X_test = self.X_test.reshape(self.X_test.shape[0], self.X_test.shape[1], 1)
      def makemodel(self, ltunits=50, time_step=100):
          m = tf.keras.models.Sequential()
          # create a Stacked LSTM model
          m.add(tf.keras.layers.LSTM(units=ltunits, return_sequences=True, input_shape=(time_step, 1)))
          m.add(tf.keras.layers.Dropout(0.2))
          m.add(tf.keras.layers.LSTM(units=ltunits, return_sequences=True))
          m.add(tf.keras.layers.Dropout(0.2))
          m.add(tf.keras.layers.LSTM(units=ltunits))
          m.add(tf.keras.layers.Dropout(0.2))
          m.add(tf.keras.layers.Dense(units=1)) # prediction of next closing value
          m.compile(optimizer='adam',  loss='mean_squared_error')
          return m