3.1.9.1. Machine Learning Example#
3.1.9.1.1. Network Metrics Database ML#
You can write your ML code in python2 or python3 or SQL.
3.1.9.1.1.1. ML SQL Arima Example#
Example Arima (Apache Madlib) ML Code -
CREATE OR REPLACE PROCEDURE tcp_payload_forecast_5min(
cap_limit integer DEFAULT 100000,
miter integer DEFAULT 10,
tbucket INTERVAL DEFAULT '5 seconds',
cinterval INTERVAL DEFAULT '60 minutes'
)
LANGUAGE SQL
AS $$
SET client_min_messages TO WARNING;
DROP TABLE IF EXISTS tcp_pkt_compute_5min;
DROP TABLE IF EXISTS tcp_pkt_compute_5min_forecast_output;
CREATE TEMP TABLE tcp_pkt_compute_5min AS
SELECT * FROM ( SELECT time_bucket( tbucket, cap_tstamp) as "time",
CAST(data->>'tcp_payload_len' AS INTEGER) as "value"
FROM L4_metric
WHERE data @> '{"ip_proto": "TCP"}'
AND cap_tstamp >= now() - cinterval
LIMIT cap_limit ) q;
DROP TABLE IF EXISTS tcp_pkt_compute_output;
DROP TABLE IF EXISTS tcp_pkt_compute_forecast_output;
DROP TABLE IF EXISTS tcp_pkt_compute_5min_output;
DROP TABLE IF EXISTS tcp_pkt_compute_5min_output_summary;
DROP TABLE IF EXISTS tcp_pkt_compute_5min_output_residual;
SELECT madlib.arima_train('tcp_pkt_compute_5min',
'tcp_pkt_compute_5min_output',
'time',
'value',
NULL,
TRUE,
ARRAY[1, 1, 1]
);
DROP TABLE IF EXISTS tcp_pkt_compute_5min_forecast_output;
SELECT madlib.arima_forecast('tcp_pkt_compute_5min_output',
'tcp_pkt_compute_5min_forecast_output', miter);
$$;
-- dummy table
CREATE TABLE tcp_pkt_compute_5min_forecast_output(id int, steps_ahead int, forecast_value decimal);
CREATE OR REPLACE PROCEDURE tcp_payload_forecast_5min_copy()
LANGUAGE SQL
AS $$
SET client_min_messages TO WARNING;
INSERT INTO tcp_pkt_predict_arima_5min(step,value,tstamp)
SELECT steps_ahead,forecast_value,now() as tstamp FROM tcp_pkt_compute_5min_forecast_output;
DROP TABLE IF EXISTS tcp_pkt_compute_output;
DROP TABLE IF EXISTS tcp_pkt_compute_forecast_output;
DROP TABLE IF EXISTS tcp_pkt_compute_5min_output;
DROP TABLE IF EXISTS tcp_pkt_compute_5min_output_summary;
DROP TABLE IF EXISTS tcp_pkt_compute_5min_output_residual;
DROP TABLE IF EXISTS tcp_pkt_compute_5min_forecast_output;
$$;
select cron.schedule('*/5 * * * *',
$$CALL tcp_payload_forecast_5min(100000,10,'5 seconds','60 minutes'); CALL tcp_payload_forecast_5min_copy();$$);
3.1.9.1.1.2. ML SQL Arima Decision Tree Example#
Example Arima Decision Tree ML Code
CREATE OR REPLACE PROCEDURE tcp_flags_decision_tree5min(
cap_limit integer DEFAULT 100000,
tbucket INTERVAL DEFAULT '5 seconds',
cinterval INTERVAL DEFAULT '60 minutes'
)
LANGUAGE SQL
AS $$
SET client_min_messages TO WARNING;
DROP TABLE IF EXISTS ref_tcp_dtree_prediction_results;
DELETE FROM metrics_ref_tcp_dtree;
INSERT INTO metrics_ref_tcp_dtree
SELECT time_bucket( tbucket, cap_tstamp) as "time",
convo_id as "convo_id",
data#>> '{layers,ip,src}' as "src_host",
CAST(data#>> '{layers,tcp,srcport}' AS INTEGER) as "src_port",
data#>> '{layers,ip,dst}' as "dst_host",
CAST(data#>> '{layers,tcp,dstport}' AS INTEGER) as "dst_port",
('x' || right(data#>>'{layers,ip,flags}',2))::bit(8)::int as "ip_flags",
(data#>> '{layers,ip,flags_df}')::boolean as "ip_df",
(data#>> '{layers,ip,flags_mf}')::boolean as "ip_mf",
CAST(data#>> '{layers,ip,len}' AS INTEGER) as "ip_len",
CAST(data#>> '{layers,ip,hdr_len}'AS INTEGER) as "ip_hlen",
CAST(data#>> '{layers,tcp,len}'AS INTEGER) as "tcp_plen",
CAST(data#>> '{layers,tcp,hdr_len}'AS INTEGER) as "tcp_hlen",
data#>> '{layers,tcp,text}' as "tcp_text",
data#>> '{layers,tcp,flags_str}' as "tcp_flags_text",
('x' || right(data#>>'{layers,tcp,flags}',4))::bit(16)::int as "tcp_flags",
(data#>> '{layers,tcp,flags_ns}')::boolean as "tcp_fl_ns",
(data#>> '{layers,tcp,flags_ack}')::boolean as "tcp_fl_ack",
(data#>> '{layers,tcp,flags_cwr}')::boolean as "tcp_fl_cwr",
(data#>> '{layers,tcp,flags_ecn}')::boolean as "tcp_fl_ecn",
(data#>> '{layers,tcp,flags_fin}')::boolean as "tcp_fl_fin",
(data#>> '{layers,tcp,flags_res}')::boolean as "tcp_fl_res",
(data#>> '{layers,tcp,flags_syn}')::boolean as "tcp_fl_syn",
(data#>> '{layers,tcp,flags_urg}')::boolean as "tcp_fl_urg",
(data#>> '{layers,tcp,flags_push}')::boolean as "tcp_fl_push",
(data#>> '{layers,tcp,flags_reset}')::boolean as "tcp_fl_rest",
CAST(data#>> '{layers,tcp,_ws_expert,group}' AS INTEGER) as "tcp_exp_group",
CAST(data#>> '{layers,tcp,_ws_expert,severity}' AS INTEGER) as "tcp_exp_severity"
FROM wire_metric
WHERE data @> '{"layers": {"tcp": {}}}' AND cap_tstamp >= now() - cinterval
LIMIT cap_limit;
--
SELECT madlib.tree_predict('ref_tcp_dtree_train_output', -- tree model
'metrics_ref_tcp_dtree', -- new data table
'ref_tcp_dtree_prediction_results', -- output table
'prob');
$$;
CREATE OR REPLACE FUNCTION tcp_flags_decision_tree5min_copy()
RETURNS INTEGER
AS $$
qry_dt = "INSERT INTO ml_metric_detection_data(convo_id,src_host,src_port,dst_host,dst_port,class,observation_weight,tcp_flags,alert,ok,ml_name,cycle,ml_algo,ml_features,ml_output,ml_tree_model,ml_output_table,tstamp) SELECT g.convo_id, g.src_host,g.src_port,g.dst_host,g.dst_port, class,observation_weight,g.tcp_flags_text, p.\"estimated_prob_ALERT\" as \"alert\", p.\"estimated_prob_OK\" as \"ok\",\'flags spec decision tree\' as ml_name,\'5min\' as cycle,\'decision tree\' as ml_algo,\'tcp_plen,tcp_hlen,tcp_flags,tcp_exp_group,tcp_exp_severity\' as ml_features,\'{}\' as ml_output,\'ref_tcp_dtree_train_output\' as ml_tree_model,\'ref_tcp_dtree_prediction_results\' as ml_output_table,now() as tstamp FROM ref_tcp_dtree_prediction_results p, metrics_ref_tcp_dtree g where p.convo_id = g.convo_id AND p.\"estimated_prob_ALERT\" = 1 ORDER BY g.convo_id;"
rv = plpy.execute( qry_dt )
return 1;
$$ LANGUAGE plpython2u;
-- for the decision tree ml job
select cron.schedule('*/5 * * * *',
$$CALL tcp_flags_decision_tree5min(100000,'5 seconds','60 minutes');SELECT tcp_flags_decision_tree5min_copy();$$);
3.1.9.1.1.3. ML SQL Test Tensorflow#
Example Tensorflow ML Code -
CREATE OR REPLACE FUNCTION test_tf(
cap_limit integer DEFAULT 10000, miter integer DEFAULT 10,
tbucket INTERVAL DEFAULT '5 seconds', cinterval INTERVAL DEFAULT '60 minutes'
)
RETURNS INTEGER
AS $$
import numpy as np;
import pandas as pd;
import tensorflow as tf;
qry = "SELECT * FROM ( SELECT time_bucket( '5 seconds', cap_tstamp) as time, CONCAT_WS('',data->>'src_host',':',data->'src_port','-',data->>'dst_host', ':',data->'dst_port') as host_con, cast(data->>'tcp_flags' AS INTEGER) as flags, cast(data->>'tcp_hdr_len' AS INTEGER) as hdr_len, cast(data->>'tcp_payload_len' AS INTEGER) as tcp_len, data->>'tcp_flag_string' as flags_str FROM L4_metric WHERE data @> '{\"ip_proto\": \"TCP\"}' AND cap_tstamp >= now() - INTERVAL '30 minutes' LIMIT 100000 ) q;"
plpy.log("qry: {}".format(qry))
dataset = []
for row in plpy.cursor( qry ):
dataset.append( row)
dataset = pd.DataFrame( dataset )
dataset_features = dataset.copy()
dataset_labels = dataset_features.pop('flags')
plpy.log("dataset: {}".format(dataset))
inputs = {}
for name, col in dataset_features.items():
plpy.log(" name '{}' col '{}'".format(name,col))
dtype = col.dtype
if dtype == object:
dtype = tf.string
else:
dtype = tf.float32
inputs[name] = tf.keras.Input(shape=(1,), name=name, dtype=dtype)
plpy.log(" inputs {}".format(inputs))
numeric_inputs = {name:input for name, input in inputs.items()
if input.dtype==tf.float32}
x = tf.keras.layers.Concatenate()(list(numeric_inputs.values()))
norm = tf.keras.layers.experimental.preprocessing.Normalization()
norm.adapt(np.array(dataset[numeric_inputs.keys()]))
all_numeric_inputs = norm(x)
plpy.log(" numeric_inputs {}".format(all_numeric_inputs))
preprocessed_inputs = [all_numeric_inputs]
for name, input in inputs.items():
if input.dtype == tf.float32:
continue
lookup = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=np.unique(dataset_features[name]))
one_hot = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=lookup.vocab_size())
x = lookup(input)
x = one_hot(x)
preprocessed_inputs.append(x)
preprocessed_inputs_cat = tf.keras.layers.Concatenate()(preprocessed_inputs)
bits_preprocessing = tf.keras.Model(inputs, preprocessed_inputs_cat)
dataset_features_dict = {name: np.array(value)
for name, value in dataset_features.items()}
dataset_dict = {name:values[:1] for name, values in dataset_features_dict.items()}
bits_preprocessing(dataset_dict)
plpy.log(" bits_dataset {}".format(bits_preprocessing))
def makemodel(preprocessing_head, inputs):
body = tf.keras.Sequential([
tf.keras.layers.Dense(64),
tf.keras.layers.Dense(1) ])
preprocessed_inputs = preprocessing_head(inputs)
result = body(preprocessed_inputs)
model = tf.keras.Model(inputs, result)
model.compile(loss=tf.losses.BinaryCrossentropy(from_logits=True),
optimizer=tf.optimizers.Adam())
return model
bits_model = makemodel(bits_preprocessing, inputs)
bits_model.fit(x=dataset_features_dict, y=dataset_labels, epochs=10)
return 1;
$$ LANGUAGE plpython3u;
3.1.9.1.1.4. ML PYthon Example Code#
Example Tensorflow Python Code -
import numpy as np
import pandas as pd
import tensorflow as tf
#from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
import math
#
class DataToTfModel(object):
def __init__(self, plpy, qry):
super(object,self).__init__()
self.plpy = plpy
self.qry = qry
self.dataset = []
self.dataset_features = None
self.dataset_labels = None
self.dataset_features_dict = None
self.dataset_dict = None
self.inputs = {}
self.bits_preprocessing = None
def get_dataset(self, labels='flags'):
for row in self.plpy.cursor(self.qry):
self.dataset.append(row) # populate the dataset with rows from db
self.dataset = pd.DataFrame(self.dataset) # push into a pandas dataframe
self.dataset_features = self.dataset.copy()
self.dataset_labels = self.dataset_features.pop( labels )
def map_to_keras_inputs(self):
for name, col in self.dataset_features.items():
self.plpy.log(" name '{}' col '{}'".format(name,col))
dtype = col.dtype
if dtype == object:
dtype = tf.string
else:
dtype = tf.float32
self.inputs[name] = tf.keras.Input(shape=(1,), name=name, dtype=dtype)
self.plpy.log(" inputs {}".format(self.inputs))
def fix_inputs(self):
# 1st step concentrate the numeric inputs together and run them
# though a normalization layer
numeric_inputs = {name:input for name, input in self.inputs.items()
if input.dtype==tf.float32}
x = tf.keras.layers.Concatenate()(list(numeric_inputs.values()))
norm = tf.keras.layers.experimental.preprocessing.Normalization()
norm.adapt(np.array(self.dataset[numeric_inputs.keys()]))
all_numeric_inputs = norm(x)
self.plpy.log(" numeric_inputs {}".format(all_numeric_inputs))
return all_numeric_inputs
def fix_symbol_processing(self, all_numeric_inputs):
# collect all the symbolic preprocessing results, to concatenate them together
preprocessed_inputs = [all_numeric_inputs]
for name, input in self.inputs.items():
if input.dtype == tf.float32:
continue
lookup = tf.keras.layers.experimental.preprocessing.StringLookup(vocabulary=np.unique(self.dataset_features[name]))
one_hot = tf.keras.layers.experimental.preprocessing.CategoryEncoding(max_tokens=lookup.vocab_size())
x = lookup(input)
x = one_hot(x)
preprocessed_inputs.append(x)
# concatenate all the preprocessed inputs together and build a model that
# handles the preprocessing
preprocessed_inputs_cat = tf.keras.layers.Concatenate()(preprocessed_inputs)
self.bits_preprocessing = tf.keras.Model(self.inputs, preprocessed_inputs_cat)
# convert to a dictionary of tensors
self.dataset_features_dict = {name: np.array(value)
for name, value in self.dataset_features.items()}
def slice_training_example(self):
# slice out our first training example and pass it to preprocessing model
# the numberic ones and strings ones are all concatenated together
self.dataset_dict = {name:values[:1] for name, values in self.dataset_features_dict.items()}
self.bits_preprocessing(self.dataset_dict)
self.plpy.log(" bits_dataset {}".format(self.bits_preprocessing))
def makemodel(self, preprocessing_head, inputs):
# build the model on top of this
body = tf.keras.Sequential([ tf.keras.layers.Dense(64), tf.keras.layers.Dense(1) ])
preprocessed_inputs = preprocessing_head(self.inputs)
result = body(preprocessed_inputs)
model = tf.keras.Model(self.inputs, result)
model.compile(loss=tf.losses.BinaryCrossentropy(from_logits=True),
optimizer=tf.optimizers.Adam())
return model
#
#
MAX_TIMESTEPS = 128
TRAINING_SIZE_PERCENTAGE = 0.65 # 65 percent of data will be test size
DEFAULT_ONE = 1
DEFAULT_ZERO = 0
class DataToTfModelLTSM(object):
def __init__(self, plpy, frange, qry):
super(object,self).__init__()
self.plpy = plpy
self.qry = qry
self.dataset = []
self.dataset_dict = None
self.scaler = None
self.scaler = MinMaxScaler(feature_range=frange)
self.train_X = None
self.train_y = None
self.max_time_steps = MAX_TIMESTEPS
self.training_size_percent = TRAINING_SIZE_PERCENTAGE
self.default_1 = 1
self.default_0 = 0
def set_max_time_steps(self, value=120):
self.max_time_steps = value
def set_training_size_percentage(self, value=0.65):
self.training_size_percent = value
def set_default_one(self, value=1):
self.default_1 = value
def set_default_zero(self, value=0):
self.default_0 = value
def create_dataset(self, dataset, time_step=1):
dX, dY = [], []
for i in range(len(dataset) - time_step - self.default_1):
a = dataset[i:(i+time_step), self.default_0] ## i=0, 0,1,2,3 etc
dX.append(a)
dY.append(dataset[i + time_step, self.default_0])
return np.array(dX), np.array(dY)
def get_dataset(self, labels='flags'):
for row in self.plpy.cursor(self.qry):
self.dataset.append(row) # populate the dataset with rows from db
self.dataset = pd.DataFrame(self.dataset) # push into a pandas dataframe
def scale_dataset(self, dataset, Label='flags', time_step=100):
# scale datset
self.scaled_data = self.scaler.fit_transform(
dataset[Label].values.reshape(-1,1)
)
self.scaled_dlen = len(self.scaled_data)
def create_test_train_dataset(self, time_step=100):
# create dataset
training_size = int(self.scaled_dlen * TRAINING_SIZE_PERCENTAGE)
test_size = self.scaled_dlen - training_size
train_data = self.scaled_data[0:training_size,:]
test_data = self.scaled_data[training_size:self.scaled_dlen,:1]
#
self.X_train, self.y_train = self.create_dataset(train_data, time_step)
self.X_test, self.y_test = self.create_dataset(test_data, time_step)
self.X_train = self.X_train.reshape(self.X_train.shape[0], self.X_train.shape[1], 1)
self.X_test = self.X_test.reshape(self.X_test.shape[0], self.X_test.shape[1], 1)
def makemodel(self, ltunits=50, time_step=100):
m = tf.keras.models.Sequential()
# create a Stacked LSTM model
m.add(tf.keras.layers.LSTM(units=ltunits, return_sequences=True, input_shape=(time_step, 1)))
m.add(tf.keras.layers.Dropout(0.2))
m.add(tf.keras.layers.LSTM(units=ltunits, return_sequences=True))
m.add(tf.keras.layers.Dropout(0.2))
m.add(tf.keras.layers.LSTM(units=ltunits))
m.add(tf.keras.layers.Dropout(0.2))
m.add(tf.keras.layers.Dense(units=1)) # prediction of next closing value
m.compile(optimizer='adam', loss='mean_squared_error')
return m