Machine Learning (Spark)

Solution Name: sample_project_spark

This solution demonstrates Machine Learning pipelines on Spark.

The model built in this solution is trained to predict the probability that a specified patient will have a stroke in the next few months.

The solution uses a Random forest classifier provided by pyspark. It consists of the following components for feature engineering and eventually build a model using Random forest classifier.

  • string_indexer - This component encodes a string column of labels to a column of label indices. Extends pysaprk’ StringIndexer.

  • one_hot_encoder - This maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values and extends pyspark’ OneHotEncoderEstimator.vector_assembler

  • vector_assembler - Component extending pyspark’ VectorAssembler helps in assembling all the feature into feature vector.

  • feature_engg_and_classifier_pipeline - Mahine learning pipeline that combines the string_indexer, one_hot_encoder and vector_assembler components

Shown below is the data snapshot.

image1

Each attribute that we want to use as a feature has to go through some transformations using some pyspark class (this is a usual part of feature preparation of any ML workflow). We would want to use one hot encode with almost all the attributes (gender, age, hypertension, heart_disease, ever_married, work_type, Residence_type, smoking_status). We would rather not directly work with strings for these attributes so we will try to index the string with integers and we do so by using pyspark’s StringIndexer. So, for each attribute we will have two stages - string_indexer and one_hot_encoder in this order.

Since this is a supervised ML example, our tag/label is stroke column. We need to make it a label - hence we have a labelindexer stage. We reuse pyspark’s StringIndexer as label indexer here.

Finally vector_assembler is used to aggregate for the prepared features into feature vector. Hence the final stage. It uses pyspark’s VectorAssembler.

Training pipeline: Finally we will have following component stages in order show below:

gender-string_indexergender-one_hot_encoderage-string_indexerage-one_hot_encoderhypertension-string_indexerhypertension-one_hot_encoderheart_disease-string_indexerheart_disease-one_hot_encoderever_married-string_indexerever_married-one_hot_encoderwork_type-string_indexer→***work_type-one_hot_encoder →Residence_type-string_indexer → Residence_type-one_hot_encoder → smoking_status-string_indexer → smoking_status-one_hot_encoder*labelindexervector_assembler

How to use this solution

You will work on a clone of this solution. The steps to be followed are:

  1. Clone the solution. Cloning a solution does not clone its code, so you need to do this manually (Steps 2-4 below)

  2. Clone the code repository of the sample solution

    1. Navigate to the code repository of the sample solution

    2. Click “Clone” and copy the git clone command

    3. Execute the command on your machine (ensure you have Git installed)

  3. Clone the code repository of the cloned solution

    1. Navigate to the code repository of the cloned solution

    2. Click “Clone” and copy the git clone command

    3. Execute the command on your machine (ensure you have Git installed)

  4. Copy code from the sample solution into the cloned solution

  5. Commit and push the code back into the code repository of the cloned solution

    1. Execute git add -A to add the changed code to the local repository

    2. Execute git commit -m “Cloned code” to commit the code to the local repository

    3. Execute git push to push the code into Bitbucket

  6. Build the cloned solution components and pipeline

    1. Select the “master” branch for each component and pipeline during the build

  7. Before deploying the components and pipelines, you need to upload the data files into the HDFS folder for the solution. Download the contents of the “input” folder under /pipelines/feature-engg-and-classifier-pipeline from the original solution and upload these into the cloned solution.

  8. Deploy the pipeline of the cloned solution. You will need to specify the deployment parameters for the pipeline, and not for each component (recall that in Spark, pipelines are executed as a whole, not as combinations of components) - you just need to specify the latest build version of the pipeline as the deployment parameter

9. The pipeline has now been deployed, but has not run. To run the pipeline,

start an experiment using the deployed version of each pipeline. Specify the following parameters during the run:

  • Name of the pipeline - <name of the pipeline>

  • Version - latest deployed version

  • Run Name - any run name of your choice (do not use a name which you have already used)

  • Run Description - any description of your choice

10. To ensure the pipeline has run properly, view the run details.

11. In xpresso.ai, Spark is run within Kubernetes. You can view the Kubernetes dashboard to see the Spark worker process in action.

12. The pipeline should have created a model in the “output” folder of the HDFS, as well as in the model repository