An example of how to use the estimator API with Tensor Flow

Our customer, a world leader in security solutions, offers a wide range of solutions aimed at numerous sectors and customer segments, from small businesses to large industrial complexes.

So a client of ours, a world leader in security solutions, offers a wide range of solutions aimed at numerous sectors and customer segments, from small businesses to large industrial complexes. One of its most outstanding services is the Operation Center—also known as SOC, which aims to provide avant-garde and differentiating solutions through the use of the most innovative technologies and the continuous supervision of its customers.

Among other things, the SOC is responsible for receiving and managing the fault messages on its surveillance devices. Within the process of managing such notices, there is an operator in the SOC who at a given moment has to make the decision of where to send such notification. It can be sent to a room technician who tries to solve the problem remotely, or to a field technician who solves the problem by physically moving to the damaged device.

The aim of this solution is to generate a reliable predictive model, which in the future could allow creating a tool capable of assisting the SOC operator when deciding where to derive each fault, in order to improve its success ratios and therefore generate a saving for our client.

Data pipeline: dataset analysis, processing and training the model

One of the most important and difficult steps in a data project is the data acquisition because it’s coming in .csv files but it has to be handled in an ETL process to be used properly. In a high-level overview, the flow of the data pipeline:

  • The original dataset is uploaded into a bucket in .csv files

  • Once the data is stored into Cloud Storage, a Cloud Function is triggered as a part of the ETL process. The Cloud Function is structured within different aims but the most important process can be described in the following screenshot: 

    # Phase of analysis of the data set.
    	df = preprocess_dataset(df)
    	# Processing phase of the data set.
    	df = analyze_and_process_dataset(df, bucket, local_tmp_path, column_codes_path, dataset_training_path, dataset_original_name)
    	# Model training on ML Engine
        	'gs://' + bucket_name + '/' + ml_engine_job_info_path,
        	'gs://' + bucket_name + '/' + dataset_training_path + 'data-' + dataset_original_name,
        	'gs://' + bucket_name + '/' + dataset_training_path + 'test-' + dataset_original_name)


As you can observe, firstly a pre-process of data is done, so panda data frame files are generated and stored in Cloud Storage (step 3). Then that new pre-processed dataset is analyzed processing the dataset, storing the coding files of columns and the datasets in Cloud Storage (step 4) to ensure that the training and evaluation of the model can be done (step 5).

All the cloud functions are based in previous work made in local notebooks through data lab.

The first important step the Cloud Function is the first data preprocess where:

  • Some unnecessary fields are removed
    # Drop columns with very scattered initial values.
    df = df.drop(['alarm_incident_full_clear_opactdisp_id', 'alarm_incident_full_clear_emp_no', 'event_history_test_seqno', 'site_install_servco_no'], axis=1)
    df = df.reset_index(drop=True)
    # Drop columns whose content does not contribute
    df = df.drop(['event_history_event_id', 'alarm_incident_status', 'comment'], axis=1)


  • Some other fields are preprocessed (filling some missing fields by default values or being formatted):
df['event_history_event_id'] = df['event_history_event_id'].str.strip()
 # Replacing and drop empty strings with NaN values

             df['alarm_incident_delay_seconds'].replace('', np.nan, inplace=True)
	df['system_systype_id'].replace('', np.nan, inplace=True)
	df['site_site_no'].replace('', np.nan, inplace=True)
	df['site_sitetype_id'].replace('', np.nan, inplace=True)
	df['site_sitestat_id'].replace('', np.nan, inplace=True)
	df['alarm_incident_status'].replace('', np.nan, inplace=True)
	df['system_Nzonas'].replace('', np.nan, inplace=True)
	df['site_Nvias'].replace('', np.nan, inplace=True)
	df['site_cspart_no'].replace('', np.nan, inplace=True)
	df['site_siteloc_id'].replace('', np.nan, inplace=True)
	df['alarm_incident_alarminc_no'].replace('', np.nan, inplace=True)
	df['comment'].replace('', np.nan, inplace=True)
# Remove blank spaces from columns with identifiers

	df['system_systype_id'] = df['system_systype_id'].str.strip()
	df['site_sitetype_id'] = df['site_sitetype_id'].str.strip()
	df['site_sitestat_id'] = df['site_sitestat_id'].str.strip()
	df['event_history_event_id'] = df['event_history_event_id'].str.strip()
	df['site_siteloc_id'] = df['site_siteloc_id'].str.strip()

	df = df.reset_index(drop=True)
  • In the process of analyzing the data set, new variables are obtained from the original data.
# Processing the date to obtain the day of the week, month and season of the year

	df['event_history_event_date'] = pd.to_datetime(df['event_history_event_date'])
	df['day_of_week'] = df['event_history_event_date'].dt.weekday_name

	df['month'] = df['event_history_event_date'].dt.month

	df = df.drop(list(df.filter(regex='date')), axis=1)

	df['season'] = 3

	df.loc[(df['month']==3) | (df['month']==4) | (df['month']==5), 'season'] = 0
	df.loc[(df['month']==6) | (df['month']==7) | (df['month']==8), 'season'] = 1
	df.loc[(df['month']==9) | (df['month']==10) | (df['month']==11), 'season'] = 2
  • In the process of the dataset, storing the coding files of columns and the datasets in Cloud Storage, so a training evaluation of the model would be able to be done. 
# Encodings of text variables to unique identifiers and storage in Cloud Storage for use by other CFs
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'system_systype_id', column_codes_path, 'systypeId_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'site_sitetype_id', column_codes_path, 'sitetypeId_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'site_sitestat_id', column_codes_path, 'sitestatId_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'site_siteloc_id', column_codes_path, 'sitelocId_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'day_of_week', column_codes_path, 'dayofweek_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'site_site_no', column_codes_path, 'siteNo_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'site_cspart_no', column_codes_path, 'cspartNo_cod.pkl')
	df = encode_values_to_pickle_and_upload_to_cloud_storage(df, bucket, local_tmp_path, 'system_system_no', column_codes_path, 'systemNo_cod.pkl')

	df['system_systype_id'] = df['system_systype_id_coded']
	df['site_sitetype_id'] = df['site_sitetype_id_coded']
	df['site_sitestat_id'] = df['site_sitestat_id_coded']
	df['site_siteloc_id'] = df['site_siteloc_id_coded']
	df['day_of_week'] = df['day_of_week_coded']
	df['site_site_no'] = df['site_site_no_coded']
	df['site_cspart_no'] = df['site_cspart_no_coded']
	df['system_system_no'] = df['system_system_no_coded']

	df = df.drop('system_systype_id_coded', axis=1)
	df = df.drop('site_sitetype_id_coded', axis=1)
	df = df.drop('site_sitestat_id_coded', axis=1)
	df = df.drop('site_siteloc_id_coded', axis=1)
	df = df.drop('day_of_week_coded', axis=1)
	df = df.drop('site_site_no_coded', axis=1)
	df = df.drop('site_cspart_no_coded', axis=1)
	df = df.drop('system_system_no_coded', axis=1)
  • Once the data is analyzed, 85% of the dataset is sent to a Cloud Storage to be used as training data and the rest to another one to be used to evaluate data.
# Generation of the training and test set for the model
	cut = int(np.round(0.85 * df.shape[0]))
	upload_in_csv_to_cloud_storage(df.iloc[0:cut], bucket, local_tmp_path, datasets_training_path, 'data-' + datasets_training_name)
	upload_in_csv_to_cloud_storage(df.iloc[cut+1:df.shape[0]], bucket, local_tmp_path, datasets_training_path, 'test-' + datasets_training_name)
  • Finally, in order to verify that the current mode is still valid for the new dataset., and there is no variable that needs to be reviewed, we create and send a basic model training job (training type) again to ML Engine with the training and generated datasets for testing purpose. 
 project_id = 'projects/{}'.format(project_name)
	job_name = 'origen_ticketing_tf_pipeline_trigger_' + time.strftime('%Y%m%d_%H%M%S')
	training_inputs = {
    		'runtimeVersion': '1.10',
    		'jobDir': job_dir + job_name,
    		'packageUris': package_uris,
    		'pythonModule': 'trainer.task',
    		'region': 'europe-west1',
   		 'args': [
        			'--train-file', train_file,
        			'--eval-file', eval_file
	job_spec = {'jobId': job_name, 'trainingInput': training_inputs}
	cloudml ='ml', 'v1')
	request = cloudml.projects().jobs().create(body=job_spec, parent=project_id)
    		response = request.execute()
	except HttpError as err:
    		# Do whatever error response is appropriate for your application.
    		# For this example, just send some text to the logs.
    		# You need to import logging for this to work.
    		logging.error('There was an error creating the training job. Check the details:')

TensorFlow model

In order to generate a learning model capable to represent the aforementioned data and make predictions accordingly, we decided to use the premade estimators in TensorFlow. Particularly, we used the class DNN Classifier to create a deep neural network classifier based in Tensorflow. This estimator has been chosen because some previous trials were made in local machines and neural networks gave betters results than boosting trees algorithms, among others. Regarding other techniques based on DNN, tests have been performed with DNN Regressor that also allows us to model deep neural networks, but that have also given worse results in our case.

Thus, the first step was to develop a complete library or app with the code structure which contains all the necessary functions. To do that, we have used the google cloud template for ML applications, available in Github. These functions deal with the data structure definition, a big data pre-processing (selection of data types, the transformation of categorical variables...), the creation of the corresponding deep neural network schema, design of the hyperparameters tuning evaluation and, finally, the training of the neural network and its evaluation on the test set. Our code contains the following functions and directory structure: 

  • trainer: subdirectory to store the main application module.

  • file is used by setup tools to identify directories with code to package.

  • the different data features are treated to convert them to the proper types and structures, some transformation…

  • input files for the training and evaluation sets are gathered from the corresponding Storage buckets and transformed into the tensorflow format.

  • some parameters needed for the code structure running are hardcoded in this function.

  • the machine learning algorithm is chosen and its architecture designed. Also, the training and evaluation experiments are run here.

  • arguments are parsed from the console and the main function resides here.

  • config.yaml: contains the test different hyperparameter configurations when training your model for maximizes your model's predictive accuracy.

  • script to generate sample JSON instances to test predictions online.

  • File with information of the module

With regard to the neural network architecture definition (based on DNNClassifier), some decisions have been made. In the case of the optimizer, Adagrad has been chosen because of the sparsity of the data and due to its improved robustness in comparison with the traditional Stochastic Gradient Descent.

The activation functions consist of rectified linear unit functions for the medium layers, which is the most accepted one in the literature, and the logistic function in the last layer (in order to do the classification).

Also, dropout is being used for the regularization of the network. Finally, some hyperparameters have been tuned with Google’s Bayesian optimization algorithm implementation: a number of layers in the network, the size scale factor between layers, the learning rate.

dnn_optimizer = tf.train.AdagradOptimizer(learning_rate = task.HYPER_PARAMS.learning_rate)

estimator = tf.estimator.DNNClassifier(
	n_classes = len( metadata.TARGET_LABELS),
	label_vocabulary = metadata.TARGET_LABELS,

	feature_columns = deep_columns,
	optimizer = dnn_optimizer,

	weight_column = metadata.WEIGHT_COLUMN_NAME,

	hidden_units = construct_hidden_units(),
	activation_fn = tf.nn.relu
	dropout = task.HYPER_PARAMS.dropout_prob,

	config = config,

Hyperparameters tuning and training of the model

The result of the data pipeline was a final dataset with 173679 rows and 49 columns, including the label. It is well known that for the learning phase, it is a good practice to split the original dataset into training, evaluation and test sets in order to work with independent data in the training, hyperparameters tuning and evaluation on unseen data. Thus, original data has been split in 85% for training and evaluation (for hyperparameter tuning), and 15% for a test.

In order to do the hyperparameter tuning a custom, the machine has been used because the configuration has been optimized step by step to reduce the total processing time used by the task. The goal of the algorithm is to maximize the model accuracy, using early stopping to avoid overfitting and with 5 trials as maximum since for greater attempts we have verified that a great increase in the success is not obtained. The search range for the different variables to be optimized has been chosen according to previous experiences and the knowledge of their behavior, resulting in the following configuration:

	scaleTier: CUSTOM
	masterType: large_model
	workerType: complex_model_m
	parameterServerType: large_model
	workerCount: 10
	parameterServerCount: 5
		goal: MAXIMIZE
		hyperparameterMetricTag: accuracy
		enableTrialEarlyStopping: True
		maxTrials: 5
		maxParallelTrials: 2

parameterName: num-layers
parameterName: layer-sizes-scale-factor
	type: DOUBLE
	minValue: 0.2
	maxValue: 0.8
parameterName: learning-rate
type: DOUBLE
minValue: 0.0001
maxValue: 0.01

Where we want to detail the following configurations:

  • As to the passing "--scale-tier STANDARD_1" and "--scale-tier BASIC", they take a long time to complete the task and, in some cases, return errors in the workers for permissions and memory. So we decided to customize the work groups with:

  • Master as large_model type, a machine with a lot of memory, especially suited for parameter servers when your model is large.

  • Worker as complex_model_m type, a machine with roughly twice the number of cores and roughly double the memory of complex_model_s.

  • Worker count is the number of worker replicas to use for the training job. We have configured 10 workers to support the maximum number of training (maxTrial) for the parallel execution configured (maxParallelTrials) without locks.

  • The number of parameter server replicas to use for the training job. We have configured 5 servers to support the maximum number of training (maxTrial).

  • The goal (goal) of the algorithm is to maximize the model accuracy (hyperparameterMetricTag), using early stopping (enableTrialEarlyStopping) to avoid overfitting and with 5 trials as the maximum (maxTrials).

  • The search range for the different variables to be optimized (num-layer, layer-size-scale-factor, learning-rate) has been chosen according to previous experiences and the knowledge of their behavior.

After completing the maximum number of trials, 5, an optimized model has been obtained with the following final hyperparameters:

“hyperparameters”: {
	“num-layers”: “4”,
	“learning-rate”: “0.0014479715888709816”,
	“layer-sizes-scale-factor”: “0.75728686112411281”

And the following final results (with the maximum of 30 tests that we have executed, only 1.5 percent were improved):

“finalMetric”: {
	“trainingStep”: “1006”,
	“objectiveValue”: 0.625320017338

Training using Cloud ML Engine

In order to upload the training and hyperparameters tuning jobs to ML Engine to make them effective in Google Cloud, the commands must be sent through the console. First, the main variables are defined:

DATE=`date '+%Y%m%d_%H%M%S'`
export JOB_NAME=origen_ticketing_tf_$DATE
export GCS_JOB_DIR=gs://com-soc-projectdata/OrigenTicketingTF/jobinfo/$JOB_NAME

export TRAIN_FILE=gs://com-soc-projectdata/OrigenTicketingTF/dataset-training/data-mmm.csv
export EVAL_FILE=gs://com-soc-projectdata/OrigenTicketingTF/dataset-training/test-mmm.csv

export HPTUNING_CONFIG=config.yaml

export TRAIN_STEPS=2000
export EVAL_STEPS=500

export REGION=europe-west1

The Google cloud commands are sent for hyperparameter tuning jobs is:

gcloud ml-engine jobs submit training $JOB_NAME \
    --stream-logs \
    --runtime-version 1.10 \
    --config $HPTUNING_CONFIG \
    --job-dir $GCS_JOB_DIR \
    --module-name trainer.task \
    --package-path trainer/ \
    --region $REGION \
    -- \
    --train-file $TRAIN_FILE \
    --eval-file $EVAL_FILE \
    --train-steps $TRAIN_STEPS \
    --eval-steps $EVAL_STEPS 

The Google cloud commands are sent for both the training jobs is:

gcloud ml-engine jobs submit training $JOB_NAME \
    --stream-logs \
    --runtime-version 1.10 \
    --job-dir $GCS_JOB_DIR \
    --module-name trainer.task \
    --package-path trainer/ \
    --region $REGION \
    -- \
    --train-file $TRAIN_FILE \
    --eval-file $EVAL_FILE \
    --train-steps $TRAIN_STEPS \
    --eval-steps $EVAL_STEPS 

Evaluation of model

Once the optimized classification model has been trained, it was necessary to test and validate its performance. For such a task, the model has been evaluated on the test set (15% of the complete dataset) with a final an accuracy of 62.53%.

Taking into account the results achieved in this particular study, and considering some other tests which were made in a local machine using different values for the parameters, other learning methods, and input features… the results seem quite good but there is room for improvement. For example, some more feature engineering could be made, making use, for example, of the implemented tensorflow capabilities to deal directly with categorical variables. On the other hand, the DNNLinearCombined classifier seems quite promising according to the literature, so testing it could be a good option.

In addition, the results are good if we compare them with the current operative in making decisions about the origin of an issue for our real case based on the demo, which has a 50% accuracy.

To deploy the model in ML Engine in order to make new predictions, the model has to be uploaded with the necessary console commands. Thus, the prediction service is set:

export MODEL_BINARIES=$GCS_JOB_DIR/export/estimator/<timestamp>

gcloud ml-engine versions create <version> --model origen_ticketing_tf --origin $MODEL_BINARIES --runtime-version 1.10

Inspect the model binaries with the SavedModel CLI TensorFlow ships with a CLI that allows you to inspect the signature of exported binary files. To do this run:

saved_model_cli show --dir $MODEL_BINARIES --tag serve --signature_def predict

Online prediction with Cloud-Fn and Web Content

The final stage covered in the current Demo is related to the deployment of the learning model so that it is available for online predictions.

First, create a processed sample from the test data:

python $EVAL_FILE sample.json

After, you send prediction requests to the API. To test this out you can use the google cloud ml-engine predict tool:

gcloud ml-engine predict --model origen_recomm_tf --version <version> --json-instances sample.json

Finally, you should see a response with the predicted labels of the examples!

CLASS_IDS  CLASSES    LOGISTIC                          LOGITS                             PROBABILITIES
[0]                  [u'0.0']         [0.3971346318721771]  [-0.417418509721756]  [0.6028653383255005, 0.3971346318721771]

That is important, because the model version used in ML Engine, which is the one exposed to the user to support scalability, is directly linked to that one in Cloud Storage.

As we said before, the input variables used in the model are inferred by those introduced by the user through the user interface. That means that we needed to translate the initial ones into the latter.

In one of the steps mentioned in Section 2 (g), we decided to save some codification tables in order to be able to perform that translation, taking into account the complete dataset context. Therefore, we defined a custom function in Cloud Functions responsible for doing the aforementioned task. Next, we will explain the most representative parts of the function: 

  1. Read the parameters from the request: which basically consist of reading the inputs provided by the user through the user interface.
    request_json = request.get_json()
    alarm_delay = request_json['alarm_incident_delay_seconds']
    system_no = request_json['system_system_no']
    site_no = request_json['site_site_no']
    n_zonas = request_json['system_Nzonas']
    n_vias = request_json['site_Nvias']
    cspart_no = request_json['site_cspart_no']
    alarminc_no = request_json['alarm_incident_alarminc_no']
    rec_date = request_json['event_history_event_date']
    systype_id = request_json['system_systype_id']
    sitetype_id = request_json['site_sitetype_id']
    sitestat_id = request_json['site_sitestat_id']
    siteloc_id = request_json['site_siteloc_id']


  2. Load the codification tables: containing the equivalences between original values and coded values.
    blob = bucket.blob('newModel_fromV3/dayofweek_cod.pkl')
    	dow_cod = pandas.read_pickle('/tmp/dayofweek_cod.pkl')
    	blob = bucket.blob('newModel_fromV3/sitelocId_cod.pkl')
    	siteloc_cod = pandas.read_pickle('/tmp/sitelocId_cod.pkl')
    	blob = bucket.blob('newModel_fromV3/sitestatId_cod.pkl')
    	sitestat_cod = pandas.read_pickle('/tmp/sitestatId_cod.pkl')
    	blob = bucket.blob('newModel_fromV3/sitetypeId_cod.pkl')
    	sitetype_cod = pandas.read_pickle('/tmp/sitetypeId_cod.pkl')
    	blob = bucket.blob('newModel_fromV3/systypeId_cod.pkl')
    	systype_cod = pandas.read_pickle('/tmp/systypeId_cod.pkl')


  3. Obtain and store the coded values and generate new variables when needed: as happens with those inferred from the date variable.
    	# Get the "system_systype_id" codification    
    	syst_c = systype_cod[systype_cod['system_systype_id'] == str(systype_id)]
    	new_syst = -1 # Default
    	if syst_c.size > 0:
        		new_syst = syst_c.iloc[0]['system_systype_id_coded']
    	# Get the "site_sitetype_id" codification    
    	sitet_c = sitetype_cod[sitetype_cod['site_sitetype_id'] == str(sitetype_id)]
    	new_site = -1 # Default
    	if sitet_c.size > 0:
        		new_site = sitet_c.iloc[0]['site_sitetype_id_coded']
    	# Get the "site_sitestat_id" codification    
    	sitest_c = sitestat_cod[sitestat_cod['site_sitestat_id'] == str(sitestat_id)]
    	new_sitest = -1 # Default
    	if sitest_c.size > 0:
        		new_sitest = sitest_c.iloc[0]['site_sitestat_id_coded']
    	# Get the "site_siteloc_id" codification    
    	sitel_c = siteloc_cod[siteloc_cod['site_siteloc_id'] == str(siteloc_id)]
    	new_sitel = -1 # Default
    	if sitel_c.size > 0:
        		new_sitel = sitel_c.iloc[0]['site_siteloc_id_coded']
    # Get the "event_history_event_date" codification
    	dateDF = pandas.DataFrame(columns=['date'])
    	dateDF.loc[0] = [rec_date]
    	day = pandas.to_datetime(dateDF['date']).dt.weekday_name.iloc[0]
    	month = pandas.to_datetime(dateDF['date']).dt.month.iloc[0]
    	day_c = dow_cod[dow_cod['day_of_week'] == day]
    	new_day = day_c.iloc[0]['day_of_week_coded']
    	if ((month==3) or (month==4) or (month==5)):
        		season = 0
        		if ((month==6) or (month==7) or (month==8)):
            			season = 1
            			if ((month==9) or (month==10) or (month==11)):
                				season = 2
                				season = 3


  4. Build the new instance: which will be used to obtain a prediction from the model.
    instance = [str(alarm_delay), str(system_no), str(site_no), str(n_zonas), str(n_vias), str(cspart_no), str(alarminc_no), str(month), str(season), str(new_syst), str(new_site), str(new_sitest), str(new_sitel), str(new_day)]


  5. Send the prediction request.
    prediction = send_to_predict_in_ml_engine('securitas-ml-starter', 'origen_ticketing_sl', instance, 'v3')


  6. Return the result given by the model.
    # Return the result of the predictions
    	if 'error' in prediction:
        		return (flask.jsonify(prediction['error']), 500, headers)
    	return (flask.jsonify(prediction['predictions']), 200, headers)


About the author

Let us have a coffee and talk about your business.

We use cookies to ensure you get the best experience on our website. More info.