Implementation of a custom estimator in Tensorflow

Our client is an international aerospace engineering company that provides acquisition services, testing and distribution of components and advanced electronic systems.

The challenge: offer a usable classification and search system among the more than 15 million components in his database. So we implemented a recommendation system that, applying the matrix factoring techniques in Tensorflow, makes suggestions to the user based on the navigation of past users to facilitate the navigation of future users.

Data pipeline: dataset analysis and preprocessing 

The first obstacle was to work with aggregate data since the client information was originally provided through .json files that contained nested fields. Therefore, we had to convert all this data into a format that allowed us to work in an easy and flexible way.

The biggest problem was that we received data from different servers per day. The company has more than one server in production in an internal auto-scaling solution, so we could get N .json files every day. With this data, the company wanted to obtain two main objectives:

  • A dashboard with the statistics of the performed searches 
  • A component recommendation personalized for each user 

One of the most important and difficult steps in a machine learning project is the processes of data acquisition. In this case for two reasons: because they come in .json files from different servers, but also because they contain confidential data. Therefore, the data had to be processed first to anonymize them and then, transformed to be able to use them later.

  • From each server, each day a .json file is uploaded with all the information. This information is sensitive, so this bucket is only accessible by the admin of the GCP console by following the security considerations. 
  • Then a cloud function is raised to anonymize the data to ensure the sensitive data is not handled. Once the .json files are de-identified, they are stored into another Cloud Storage (step 3). During this process the data is not transformed, the only aim of the Cloud function is to de-identify the data doing primitive transformation over some fields (ex: IP, email) by using the Cloud Data Loss Prevention API. 
# Construct deidentify configuration dictionary
	deidentify_config = {
    		'info_type_transformations': {
        		'transformations': [
            			{
                			"primitive_transformation": {
                    				"crypto_hash_config": {
                        					"crypto_key": {
                            					"unwrapped": {
                                						"key": crypto_hash_key
                            							}
                        							}
                    						}
                					}
            					}
        				]
    			}
		}

	# Construct item
	item = {'value': content}

	# Call the API
	response = dlp.deidentify_content(
    		parent, inspect_config=inspect_config,
    		deidentify_config=deidentify_config, item=item)
  • In this step, there are 2 cloud functions: therefore, each function of the cloud performs the ETL process necessary for each objective, and the processed data is subsequently stored in BigQuery (Step 5) because we must continue adding data and also one of them is being used from Data Studio ( search statistics) and the other is used from the Data Model.
  • The first Cloud Function transforms and stores the data related to the searches. So the Data Studio can be used with that information.
client = bigquery.Client()
   	 
    	rows_to_insert = [
        		(instance['jsessionId'], instance['sessionId'], instance['date'], instance['time'], instance['refererURL'], instance['origin'],
         	instance['searchPosition'], instance['searchId'], instance['ip'], instance['application_quality'],
         	instance['application_radiationMinimum'], instance['application_radiationMaximum'], instance['application_typeOfSatellite'],
         	instance['geoIp_regionCode'], instance['geoIp_countryName'], instance['geoIp_status'], instance['geoIp_continentCode'],
         	instance['geoIp_regionName'], instance['geoIp_countryCode'], instance['geoIp_longitude'], instance['geoIp_latitude'],
         	instance['geoIp_city'], instance['device_os'], instance['device_manufacturer'], instance['device_group'], instance['device_type'],
         	instance['device_browser_renderingEngine'], instance['device_browser_manufacturer'], instance['device_browser_name'],
         	instance['device_browser_majorVersion'], instance['device_browser_minorVersion'], instance['device_browser_group'],
         	instance['device_browser_type'], instance['device_browser_version'], instance['originalComponent_componentNumber'],
         	instance['originalComponent_style'], instance['originalComponent_family'], instance['originalComponent_maker'],
         	instance['originalComponent_familyPath'], instance['originalComponent_familyId'], instance['originalComponent_componentId'],
         	instance['originalComponent_nasaGroup'], instance['originalComponent_nasaSection'], instance['originalComponent_qualified'],
         	instance['relatedComponent_componentNumber'], instance['relatedComponent_style'], instance['relatedComponent_family'],
         	instance['relatedComponent_maker'], instance['relatedComponent_familyPath'], instance['relatedComponent_familyId'],
         	instance['relatedComponent_componentId'], instance['relatedComponent_nasaGroup'], instance['relatedComponent_nasaSection'],
         	instance['relatedComponent_qualified'], instance['user_userId'], instance['user_email'])
    	]

    	# API request
   	 
    	table_ref = client.dataset(bq_dataset).table(bq_table)
    	table = client.get_table(table_ref)
   	 
    	client.insert_rows(table, rows_to_insert)
  • The second Cloud Function transforms and stores the data related to each component to be able to be used as a dataset from the model.
	client = bigquery.Client()
   	 
    	rows_to_insert = [
        		(instance['jsessionId'], instance['sessionId'], instance['date'], instance['time'], instance['refererURL'], instance['origin'],
         	instance['searchPosition'], instance['searchId'], instance['ip'], instance['application_quality'],
         	instance['application_radiationMinimum'], instance['application_radiationMaximum'], instance['application_typeOfSatellite'],
         	instance['geoIp_regionCode'], instance['geoIp_countryName'], instance['geoIp_status'], instance['geoIp_continentCode'],
         	instance['geoIp_regionName'], instance['geoIp_countryCode'], instance['geoIp_longitude'], instance['geoIp_latitude'],
         	instance['geoIp_city'], instance['device_os'], instance['device_manufacturer'], instance['device_group'], instance['device_type'],
         	instance['device_browser_renderingEngine'], instance['device_browser_manufacturer'], instance['device_browser_name'],
         	instance['device_browser_majorVersion'], instance['device_browser_minorVersion'], instance['device_browser_group'],
         	instance['device_browser_type'], instance['device_browser_version'], instance['originalComponent_componentNumber'],
         	instance['originalComponent_style'], instance['originalComponent_family'], instance['originalComponent_maker'],
         	instance['originalComponent_familyPath'], instance['originalComponent_familyId'], instance['originalComponent_componentId'],
         	instance['originalComponent_nasaGroup'], instance['originalComponent_nasaSection'], instance['originalComponent_qualified'],
         	instance['relatedComponent_componentNumber'], instance['relatedComponent_style'], instance['relatedComponent_family'],
         	instance['relatedComponent_maker'], instance['relatedComponent_familyPath'], instance['relatedComponent_familyId'],
         	instance['relatedComponent_componentId'], instance['relatedComponent_nasaGroup'], instance['relatedComponent_nasaSection'],
         	instance['relatedComponent_qualified'], instance['user_userId'], instance['user_email'])
    	]

    	# API request
   	 
    	table_ref = client.dataset(bq_dataset).table(bq_table)
    	table = client.get_table(table_ref)
   	 
    	client.insert_rows(table, rows_to_insert)
  • As we mentioned before, the main reason to store the processed data into BigQuery is that:
  • One dataset is used in a Data Studio and we needed performing queries like SQL.
  • For the model, Cloud Storage was an option but the main reason that moved the decision into BigQuery is that we needed to add data and in BigQuery allows to do it.

Data Selection and Exploration 

Once the data is stored into Cloud Storage, a Cloud Function is triggered as a part of the ETL process. The Cloud Function is structured within different aims but the most important process can be described in the following screenshot:

# Load dataset from BigQuery
    
    df = pd.io.gbq.read_gbq("""SELECT * FROM dataset.component""", project_id=project_name)

    # Phase of analysis of the data set.
    
    df = preprocess_dataset(df,threshold)
    
    # Processing phase of the data set.
    
    df = analyze_and_process_dataset(df, bucket, local_tmp_path, column_codes_path, dataset_training_path, dataset_original_name, percenttrain)
    
    # Model training on ML Engine
    
    train_model_in_ml_engine(
    	project_name,
    	'gs://' + bucket_name + '/' + ml_engine_job_info_path,
    	ml_engine_package_uris.split(','),
    	'gs://' + bucket_name + '/' + dataset_training_path + 'data-' + dataset_original_name,
    	'gs://' + bucket_name + '/' + dataset_training_path + 'test-' + dataset_original_name)

As you can observe, data is imported and pre-processed, removing columns with missing values. Then that new pre-processed dataset is analyzed and prepared, encoding the user and the component identification into ordered numbers and storing the data frame files for training and evaluation in Cloud Storage. The recommended system is trained in ML Engine.

Cloud Function - Data preprocess

Cloud function is the first step in order to preprocess data where: 

  • Empty strings are converted into missing values 
# Convert empty strings to missing values
    
    df_initial = df.replace('',np.nan, regex=True)
  • Columns with more missing values than a  desired threshold are removed 
# Obtain number of missing values
    
    tab_info=pd.DataFrame(df_initial.dtypes).T.rename(index={0:'column type'})
    tab_info=tab_info.append(pd.DataFrame(df_initial.isnull().sum()).T.rename(index={0:'null values (number)'}))
    tab_info=tab_info.append(pd.DataFrame(df_initial.isnull().sum()/df_initial.shape[0]*100).T.rename(index={0:'null values (%)'}))
    
    # Remove columns with more missing values than a pre-defined percentage threshold
    
    data_colOut = df_initial.copy()
   	 
    for x in range(0, len(df_initial.columns.values)):
   	 if tab_info.values[2][x]>float(threshold):
   		 data_colOut = data_colOut.drop(df_initial.columns.values[x], axis=1)

  • Other undesired features are also removed
# Remove undesired features
    
    toRemove = ['device_browser_type', 'device_browser_group', 'device_browser_minorVersion', 'device_browser_name', 'device_browser_manufacturer', 'device_browser_majorVersion', 'device_browser_version','device_browser_renderingEngine']

    data4Query = data_colOut.drop(toRemove, axis=1)

In the next post, we will discuss the analysis and processing of the data set as well as the implementation of the Tensorflow model and the evaluation of the model. Do you have doubts or do you want to put your data in a safe place? Do not hesitate and contact us, we will be happy to be part of your project.

 

 

About the author

Let us have a coffee and talk about your business.

We use cookies to ensure you get the best experience on our website. More info.