Product Features
...
Machine Learning Models
Model Types
Machine Learning Classification
1min
Machine Learning Classification uses models created in TensorFlow. See Upload a Model for more information.
The TensorFlow Images Processor feeds images to an already created TensorFlow Model.
The TensorFlow Processor feeds timeseries data to an aready created TensorFlow Model.
This use case is a customized classification version of a CNN classification model from the TensorFlow website.
# cnn model
from numpy import mean
from numpy import std
from numpy import dstack
from pandas import read_csv
import numpy as np
import tensorflow as tf
from tensorflow import keras
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
# load a single file as a numpy array
def load_file(filepath):
dataframe = read_csv(filepath, header=None, delim_whitespace=True)
return dataframe.values
# load a list of files and return as a 3d numpy array
def load_group(filenames, prefix=''):
loaded = list()
for name in filenames:
data = load_file(prefix + name)
loaded.append(data)
# stack group so that features are the 3rd dimension
loaded = dstack(loaded)
return loaded
# load a dataset group, such as train or test
def load_dataset_group(group, prefix=''):
filepath = prefix + group + '/Inertial Signals/'
# load all 9 files as a single array
filenames = list()
# total acceleration
filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt']
body acceleration
filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt']
# body gyroscopefilenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt']
# load input data
X = load_group(filenames, filepath)
load class output
y = load_file(prefix + group + '/y_'+group+'.txt')
return X, y
# load the dataset, returns train and test X and y elements
def load_dataset(prefix=''):
# load all train
trainX, trainy = load_dataset_group('train', prefix + 'HARDataset/')
print(trainX.shape, trainy.shape)
# load all test
testX, testy = load_dataset_group('test', prefix + 'HARDataset/')
print(testX.shape, testy.shape)
zero-offset class values
trainy = trainy - 1
testy = testy - 1
one hot encode y
trainy = tf.keras.utils.to_categorical(trainy)
testy = tf.keras.utils.to_categorical(testy)
print(trainX.shape, trainy.shape, testX.shape, testy.shape)
return trainX, trainy, testX, testy
# standardize data
def scale_data(trainX, testX, standardize):
# remove overlap
cut = int(trainX.shape[1] / 2)
longX = trainX[:, -cut:, :]
# flatten windows
longX = longX.reshape((longX.shape[0] * longX.shape[1], longX.shape[2]))
# flatten train and test
flatTrainX = trainX.reshape((trainX.shape[0] * trainX.shape[1], trainX.shape[2]))
flatTestX = testX.reshape((testX.shape[0] * testX.shape[1], testX.shape[2]))
# standardize
if standardize:
s = StandardScaler()
# fit on training data
s.fit(longX)
# apply to training and test data
longX = s.transform(longX)
flatTrainX = s.transform(flatTrainX)
flatTestX = s.transform(flatTestX)
# reshape
flatTrainX = flatTrainX.reshape((trainX.shape))
flatTestX = flatTestX.reshape((testX.shape))
return flatTrainX, flatTestX
# fit and evaluate a model
def evaluate_model(trainX, trainy, testX, testy, param, n_filters, kernal_size):
verbose, epochs, batch_size = 0, 10, 32
n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1]
# scale data
trainX, testX = scale_data(trainX, testX, param)
model = keras.Sequential()
model.add(tf.keras.layers.Conv1D(filters=n_filters, kernel_size=kernal_size, activation='relu', input_shape=(n_timesteps,n_features)))
model.add(tf.keras.layers.Conv1D(filters=n_filters, kernel_size=kernal_size, activation='relu'))
model.add(tf.keras.layers.Dropout(0.5))
model.add(tf.keras.layers.MaxPooling1D(pool_size=2))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(100, activation='relu'))
model.add(tf.keras.layers.Dense(n_outputs, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
# fit network
model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose)
# evaluate model
_, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0)
return accuracy, model
# summarize scores
def summarize_results(scores, params):
print(scores, params)
summarize mean and standard deviation
for i in range(len(scores)):
m, s = mean(scores[i]), std(scores[i])
print('Param=%s: %.3f%% (+/-%.3f)' % (params[i], m, s))
# boxplot of scores
# plt.boxplot(scores, labels=params)
# plt.savefig('exp_cnn_standardize.png')
# run an experiment
def run_experiment(params, repeats=1):
# load data
trainX, trainy, testX, testy = load_dataset()
# test each parameter
all_scores = list()
for p in params:
# repeat experiment
scores = list()
model = keras.Sequential()
for r in range(repeats):
score, model = evaluate_model(trainX, trainy, testX, testy, p, n_filters=64, kernal_size=3)
score = score * 100.0
model.summary()
# if p:
# model.save("/motionModel/")
yy = model.predict(trainX)
print(np.round(yy,3))
print(testy)
print('>p=%s #%d: %.3f' % (p, r+1, score))
scores.append(score)
all_scores.append(scores)
# summarize results
summarize_results(all_scores, params)
### run the experiment
n_params = [False, True]
run_experiment(n_params)
# plt.show()