Product Features
...
Machine Learning Models
Model Types

Machine Learning Classification

1min

Machine Learning Classification uses models created in TensorFlow. See Upload a Model for more information.

The TensorFlow Images Processor feeds images to an already created TensorFlow Model.

The TensorFlow Processor feeds timeseries data to an aready created TensorFlow Model.

This use case is a customized classification version of a CNN classification model from the TensorFlow website.

# cnn model from numpy import mean from numpy import std from numpy import dstack from pandas import read_csv import numpy as np import tensorflow as tf from tensorflow import keras from sklearn.preprocessing import StandardScaler import matplotlib.pyplot as plt # load a single file as a numpy array def load_file(filepath): dataframe = read_csv(filepath, header=None, delim_whitespace=True) return dataframe.values # load a list of files and return as a 3d numpy array def load_group(filenames, prefix=''): loaded = list() for name in filenames: data = load_file(prefix + name) loaded.append(data) # stack group so that features are the 3rd dimension loaded = dstack(loaded) return loaded # load a dataset group, such as train or test def load_dataset_group(group, prefix=''): filepath = prefix + group + '/Inertial Signals/' # load all 9 files as a single array filenames = list() # total acceleration filenames += ['total_acc_x_'+group+'.txt', 'total_acc_y_'+group+'.txt', 'total_acc_z_'+group+'.txt'] body acceleration filenames += ['body_acc_x_'+group+'.txt', 'body_acc_y_'+group+'.txt', 'body_acc_z_'+group+'.txt'] # body gyroscopefilenames += ['body_gyro_x_'+group+'.txt', 'body_gyro_y_'+group+'.txt', 'body_gyro_z_'+group+'.txt'] # load input data X = load_group(filenames, filepath) load class output y = load_file(prefix + group + '/y_'+group+'.txt') return X, y # load the dataset, returns train and test X and y elements def load_dataset(prefix=''): # load all train trainX, trainy = load_dataset_group('train', prefix + 'HARDataset/') print(trainX.shape, trainy.shape) # load all test testX, testy = load_dataset_group('test', prefix + 'HARDataset/') print(testX.shape, testy.shape) zero-offset class values trainy = trainy - 1 testy = testy - 1 one hot encode y trainy = tf.keras.utils.to_categorical(trainy) testy = tf.keras.utils.to_categorical(testy) print(trainX.shape, trainy.shape, testX.shape, testy.shape) return trainX, trainy, testX, testy # standardize data def scale_data(trainX, testX, standardize): # remove overlap cut = int(trainX.shape[1] / 2) longX = trainX[:, -cut:, :] # flatten windows longX = longX.reshape((longX.shape[0] * longX.shape[1], longX.shape[2])) # flatten train and test flatTrainX = trainX.reshape((trainX.shape[0] * trainX.shape[1], trainX.shape[2])) flatTestX = testX.reshape((testX.shape[0] * testX.shape[1], testX.shape[2])) # standardize if standardize: s = StandardScaler() # fit on training data s.fit(longX) # apply to training and test data longX = s.transform(longX) flatTrainX = s.transform(flatTrainX) flatTestX = s.transform(flatTestX) # reshape flatTrainX = flatTrainX.reshape((trainX.shape)) flatTestX = flatTestX.reshape((testX.shape)) return flatTrainX, flatTestX # fit and evaluate a model def evaluate_model(trainX, trainy, testX, testy, param, n_filters, kernal_size): verbose, epochs, batch_size = 0, 10, 32 n_timesteps, n_features, n_outputs = trainX.shape[1], trainX.shape[2], trainy.shape[1] # scale data trainX, testX = scale_data(trainX, testX, param) model = keras.Sequential() model.add(tf.keras.layers.Conv1D(filters=n_filters, kernel_size=kernal_size, activation='relu', input_shape=(n_timesteps,n_features))) model.add(tf.keras.layers.Conv1D(filters=n_filters, kernel_size=kernal_size, activation='relu')) model.add(tf.keras.layers.Dropout(0.5)) model.add(tf.keras.layers.MaxPooling1D(pool_size=2)) model.add(tf.keras.layers.Flatten()) model.add(tf.keras.layers.Dense(100, activation='relu')) model.add(tf.keras.layers.Dense(n_outputs, activation='softmax')) model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) # fit network model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose) # evaluate model _, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0) return accuracy, model # summarize scores def summarize_results(scores, params): print(scores, params) summarize mean and standard deviation for i in range(len(scores)): m, s = mean(scores[i]), std(scores[i]) print('Param=%s: %.3f%% (+/-%.3f)' % (params[i], m, s)) # boxplot of scores # plt.boxplot(scores, labels=params) # plt.savefig('exp_cnn_standardize.png') # run an experiment def run_experiment(params, repeats=1): # load data trainX, trainy, testX, testy = load_dataset() # test each parameter all_scores = list() for p in params: # repeat experiment scores = list() model = keras.Sequential() for r in range(repeats): score, model = evaluate_model(trainX, trainy, testX, testy, p, n_filters=64, kernal_size=3) score = score * 100.0 model.summary() # if p: # model.save("/motionModel/") yy = model.predict(trainX) print(np.round(yy,3)) print(testy) print('>p=%s #%d: %.3f' % (p, r+1, score)) scores.append(score) all_scores.append(scores) # summarize results summarize_results(all_scores, params) ### run the experiment n_params = [False, True] run_experiment(n_params) # plt.show()