PySpark to Sparkify: music users churn prediction

K Benaggoune
8 min readFeb 18, 2022
Illustration by Efi Chalikopoulou

Intro

Sparkify is a fictional music streaming service created by Udacity. Sparkify offers a free or paid subscription to users who create an account and start listening. Paid users will benefit from some additional advantages such as no advertisements in between songs, and high quality music. Moreover, users have the ability to upgrade, downgrade or cancel their service at any time.

As in many industry sectors, customers churn is the most common problem. The churn rate is the percentage of customers who have ceased to use the business’ product or service in a given time span. As a data scientist in Sparkify company. My role is to prevent customers churn using different information related to the users. Relevant customers data are used to predict if a user will churn or not. Then, Sparkify can proceed with a marketing plan to keep users likely to churn by giving some advantages and discounts.

This work presents a capstone project of the nanodegree data science program from Udacity. For more details please check the code in: https://github.com/khaledbenag/spark_to_sparkify

Project overview

Two months action logs, from 01–10–2018 to 03–12–2018, of Sparkify service are given in a JSON file. This data subset represent 1% of the whole original data. We will extract relevant features from the logs to train a machine learning model to predict user churn (binary classification). We need to keep in mind that the small dataset is used to prepare a PySpark pipeline for processing and modeling that keeps scalability and still working with a large dataset.

In this project, we will learn:

  • How to load a dataset into PySpark framework, perform EDA, transform and save processed features.
  • How to train a machine learning model using PySpark on a supervised classification problem.
  • Perform model parameters tuning using GridSearch strategy with PySpark

Exploratory Data Analysis

Lets load the data and see how many features we got:

path = "mini_sparkify_event_data.json"
# load data
data_log = spark.read.json(path)
data_log.printSchema()
root
|-- artist: string (nullable = true)
|-- auth: string (nullable = true)
|-- firstName: string (nullable = true)
|-- gender: string (nullable = true)
|-- itemInSession: long (nullable = true)
|-- lastName: string (nullable = true)
|-- length: double (nullable = true)
|-- level: string (nullable = true)
|-- location: string (nullable = true)
|-- method: string (nullable = true)
|-- page: string (nullable = true)
|-- registration: long (nullable = true)
|-- sessionId: long (nullable = true)
|-- song: string (nullable = true)
|-- status: long (nullable = true)
|-- ts: long (nullable = true)
|-- userAgent: string (nullable = true)
|-- userId: string (nullable = true)

The dataset presents a total of 18 features. We can classify features into three levels.

User information

There are specific information related to the users such as userId, firstNmae, gender, level, …

user information

In total, we have 226 users with 104 female users, and 121 male users, and 1 None corresponding value. Users generates 286500 different logs, where 8346 logs are with empty userId (“”). Lets check the authentification tag of these missing values in userId:

data_log.filter(data_log.userId=="").groupBy("auth").count()\
.toPandas()
Missing userId values authentification tags

So, only logged-out users and guests are associated with an empty “userId” field, which is a quiet logic, since we have no information. Now lets check how many paid and free action logs:

data_log.groupBy("level").count().toPandas()\
.plot(kind='bar', x = 'level')

After location is transformed to the right form (“state”column) which takes only the abreviation of the state. Now we can plot the states with most Sparkify streaming logs.

data_log.groupBy(“state”).count().toPandas().sort_values(by=’count’, ascending = False).plot(x=”state”, kind=”bar”, figsize=(12,7))

From the graph, California, New york, and Texas cities are the most cities using Sparkify.

Song information

Other information are related to the song such as associated artist, and the song length.

data_log.select(['artist','song','length']).show(5)+----------------+--------------------+---------+
| artist| song| length|
+----------------+--------------------+---------+
| Martha Tilston| Rockpools|277.89016|
|Five Iron Frenzy| Canada|236.09424|
| Adam Lambert| Time For Miracles| 282.8273|
| Enigma|Knocking On Forbi...|262.71302|
| Daft Punk|Harder Better Fas...|223.60771|
+----------------+--------------------+---------+

The dataset counts 58481 different songs for 17656 artists. The most played song is “You’re The One” of Dwight Yoakam which is played over 1122 times, and the most played artist are “Kings Of Leon” over 1841 times.

Logs information

This part is the most important which shows how users logs and interact with Sparkify services. Informaion related to session, time, page, auth, and method are relevant to extract usefull insights.

#  logs information
data_log.select(['time','sessionId', 'itemInSession', 'page', 'auth', 'method', 'status']).show(5)
Logs information

The mini dataset has 2354 unique sessions. Page information are related to the actions of the users:

['Cancel','Submit Downgrade','Thumbs Down','Home','Downgrade', 'Roll Advert', 'Logout', 'Save Settings', 'Cancellation Confirmation','About','Submit', 'Registration', 'Settings', 'Login', 'Register', 'Add to Playlist','Add Friend','NextSong','Thumbs Up','Help','Upgrade','Error','Submit Upgrade']

“page” information are useful, we can notice “Cancellation Confirmation” which gives information about the label. Other information such as Submit Downgrade and Downgrade need more exploration.

Now lets check which is the most visited page:

data_log.groupBy(“page”).count().toPandas().plot(kind= “bar”,
x=”page”,color = “navy”, figsize=(12,7))
Logs count by page type

NextSong is the most visited page.

Moreover, authentification states in Sparkify service are: Logged Out, Cancelled, Guest, Logged In. We can notice that we have four different states. However, Logged Out and Guest will not be used and only Cancelled and Logged In will be explored.

Define Churn

The churn can be defined using the Cancellation Confirmaion page.

# percentage of churned
churned_rate = data_log.groupby("userId").agg({"churned": "sum"}).select(avg("sum(churned)")).collect()[0]["avg(sum(churned))"]
print("churned: {:.2f}%".format(churned_rate * 100))
churned: 23.11%

The impact of been male or female on userChurn:

gender_cancel[gender_cancel.userChurn == 1].plot(x= “gender”, y=”count”, kind= “bar”, color= “navy”)
UserChurn count by gender

Users Males tend to cancel more than females.

The impact of paid and free accounts on userChurn:

free_paid_cancels[free_paid_cancels.userChurn == 1].plot(x = “level”, y=”count”, kind = “bar”, color= “g”)
Level impact on userChurn

Free users cancel more than paid users, maybe it is related to the trial period.

Now lets inspect the average played songs by each user in both classes of churn and NotChurn:

ss = data_log.where(data_log.page == “NextSong”).groupby(‘userChurn’,’userId’).agg(count(‘song’).alias(“countSongs”))\
.toPandas()
ss.head()
# Draw a nested boxplot to show bills by day and time
sns.boxplot(x=”userChurn”, y=”countSongs”,
data=ss).set_title(“Listned songs by each user in each class (Churned/NotChurned)”)
sns.despine(offset=10, trim=True)
Played songs by user churn: 0 for NotChurn, 1 for Churn

It is clear that users who churn likely listen less than other users who not churn.

For more details about the impact of different features on the target, please check my Github repo.

Define features

We will compute the following features from the Sparkify logs to train machine learning models.

  • Given features: gender, location, userAgent, level
  • Toal songs played by each user
  • Distinct songs by user
  • Distinct artists by user
  • Ratio of paid actions (paid actions /(paid+free)actions )
  • subscription length
  • number of errors pages
  • number of help page visit
  • number of GETs/(number of PUTs + number of GETs)
  • average itemInSession by user
  • average song per session

Modeling

The first pipeline is used to transform features, and split the data into train and test:

# pipeline
num_cols = []
for field in df.schema.fields :
if field.dataType!=StringType():
num_cols.append(field.name)
num_cols = [x for x in num_cols if x not in ['label', 'userId']]
def get_data(df, num_cols):
idx_gender = StringIndexer(inputCol='gender', outputCol='gender_idx')
idx_last_level = StringIndexer(inputCol='last_level', outputCol='last_level_idx')
num_cols = num_cols+["gender_idx", "last_level_idx"]
# normalize data
assembler = VectorAssembler(inputCols=num_cols, outputCol='num_cols')
scaler = StandardScaler(inputCol='num_cols', outputCol ='features', withStd=True, withMean=True)
pipeline = Pipeline(stages=[idx_gender, idx_last_level,assembler, scaler ])
df_pipeline = pipeline.fit(df).transform(df)
data = df_pipeline.select(df_pipeline.features, df_pipeline.label)
train, test = data.randomSplit([0.8, 0.2], seed = 36)
return train, test
train, test = get_data(df, num_cols)

Next a machine learning model is defined, and trained on the train data:

# model 
lr = LogisticRegression()
# train and evaluate
lr = lr.fit(train)

Finally, the model is evaluated using F1-score, due to the imbalanced data in our case:

def evaluate(model, test):
# Fit and calculate predictions
results = model.transform(test)
f1_score_evaluator = MulticlassClassificationEvaluator(metricName='f1')
f1_score = f1_score_evaluator.evaluate(results.select(col('label'), col('prediction')))
print('The F1 score on the test set is {:.2%}'.format(f1_score))
auc_evaluator = BinaryClassificationEvaluator()
metric_value = auc_evaluator.evaluate(results, {auc_evaluator.metricName: "areaUnderROC"})
print('The areaUnderROC on the test set is {:.2%}'.format(metric_value))
return
# evaluate
evaluate(lr, test)
The F1 score on the test set is 80.65%
The areaUnderROC on the test set is 92.00%

To enhance the model performance, parameters finetuning is adopted in this work:

# Logistic regression with cross validation
lr = LogisticRegression()
lr_paramGrid = ParamGridBuilder() \
.addGrid(lr.maxIter, [10, 40, 100]) \
.addGrid(lr.elasticNetParam, [0.0, 0.05, 0.1] ) \
.build()
lr_crossval = CrossValidator(estimator=lr,
estimatorParamMaps=lr_paramGrid,
evaluator=BinaryClassificationEvaluator(metricName = ‘areaUnderROC’),
numFolds=5)
# train
lr_crossval = lr_crossval.fit(train)
# evaluate
evaluate(lr_crossval,test)
The F1 score on the test set is 92.00%
The areaUnderROC on the test set is 93.00%

After tuining maxIter and elasticNetParam parameters the F1-score is increased from80.65% to 92%, and the areaUnerCurve from 92% to 93%.

For other models results please check my github repo

Perspectives

In this project, we defined some features relevant to user churn problems from Sparkify logs. Feature selection/extraction is a vast area to discover, and there are different directions in this dataset to explore, such as extracting timestamp related features (registration period, monthly actions, daily logs, …). Moreover, PySpark is an interesting tool that can be combined with other tools like kubeflow for machine learning model monitoring, and kubernetes for model orchestration (models in production).

Conclusion

This project allowed us to get familiar with Spark and to practice several data science skills: data analysis, cleaning, feature extraction, machine learning pipeline creation, model evaluation and fine tuning.

With PySpark, we learned how to run machine learning models in a very real-world problem. We were able to obtain an F1 score of 0.82 for predicting disaffection using a linear regression algorithm, and 0.92 after fine-tuning the parameters; these scores are quite good, although certainly not representative, perhaps because our subset has only 225 distinct users.

--

--

K Benaggoune

I have a PhD in industrial computing. I am experienced with predictive models and medical imaging techniques. Likewise, I love chess and puzzles.