! sudo apt install openjdk-8-jdk
! sudo update-alternatives --config java
import numpy as np
import pandas as pd
import json
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import cm
from datetime import datetime
import glob
import seaborn as sns
import re
import os
import boto3
from botocore import UNSIGNED
from botocore.config import Config
s3 = boto3.resource('s3', config=Config(signature_version=UNSIGNED))
s3.Bucket('penn-cis545-files').download_file('youtube_data.zip', 'youtube_data.zip')
!unzip /content/youtube_data.zip
The dataset is a daily record of the top trending YouTube videos.
To determine the year’s top-trending videos, YouTube uses a combination of factors including measuring users interactions (number of views, shares, comments and likes). Top performers on the YouTube trending list are music videos (such as the famously viral “Gangnam Style”), celebrity and/or reality TV performances, and the random dude-with-a-camera viral videos that YouTube is well-known for.
This dataset includes several months (and counting) of data on daily trending YouTube videos. Data is included for numerous countries, with up to 200 listed trending videos per day.
Each region’s data is in a separate file. Data includes:
The data also includes a category_id field, which varies between regions.
There are multiple csv files in the dataset, each corresponding to a specific country. So, the first step is to read them and combine these csv files into a single dataframe.
# Import all the csv files
files = [i for i in glob.glob('/content/youtube_data/*.csv'.format('csv'))]
sorted(files)
# Combine all into a single dataframe "combined_data" and add a 'country' column.
all_dataframes = list()
country=['CA','FR','IN','US']
i=0
for csv in files:
all_dataframes = all_dataframes+[pd.read_csv(csv,header=[0]).set_index('video_id')]
all_dataframes[i]['country']=country[i]
i=i+1
combined_data = pd.concat(all_dataframes)
combined_data
# Read the category_id.json file and map the category_id's in the dataframe to the category name.
combined_data['category_id'] = combined_data['category_id'].astype(str)
def find_title(item):
return item['title']
category_df = pd.DataFrame(pd.read_json('/content/youtube_data/US_category_id.json').iloc[:,2].tolist())
category_df['title'] = category_df['snippet'].apply(find_title)
def map_cat(id_str):
return category_df.loc[category_df.id==id_str,'title'].values[0]
combined_data.insert(4, 'category',combined_data['category_id'].apply(map_cat))
combined_data
def trending_datetime(time):
return pd.to_datetime('20'+time,format='%Y.%d.%m', errors='ignore')
combined_data['trending_date'] = combined_data['trending_date'].apply(trending_datetime)
combined_data['publish_time'] = pd.to_datetime(combined_data['publish_time'])
# remove NA's
combined_data = combined_data.dropna()
# Print some simple statistics like mean, standard deviation, min annd max for each of the numerical features in the dataset.
maxs = combined_data[['views','likes','dislikes','comment_count']].apply(np.max,axis=0)
mins = combined_data[['views','likes','dislikes','comment_count']].apply(np.min,axis=0)
stds = combined_data[['views','likes','dislikes','comment_count']].apply(np.std,axis=0)
means = combined_data[['views','likes','dislikes','comment_count']].apply(np.mean,axis=0)
maxs
mins
stds
means
# Rescale the likes, views, dislkes and comment_count to log scale (base e) to avoid numerical instability issues
def plusone(num):
return num+1
combined_data['likes_log'] = combined_data['likes'].apply(plusone).apply(np.log)
combined_data['views_log'] = combined_data['views'].apply(np.log)
combined_data['dislikes_log'] = combined_data['dislikes'].apply(plusone).apply(np.log)
combined_data['comment_log'] = combined_data['comment_count'].apply(plusone).apply(np.log)
sns.distplot(combined_data['likes_log'])
sns.distplot(combined_data['views_log'])
sns.distplot(combined_data['dislikes_log'])
sns.distplot(combined_data['comment_log'])
As a next step, we try to gain insights into the data using categories, views, likes and dislikes.
# Number of videos for each category
plt.figure(figsize=(10,5))
chart = sns.countplot(
data=combined_data,
x='category',
palette='Set1',
order=combined_data['category'].value_counts().index
)
plt.xticks(
rotation=45,
horizontalalignment='right',
fontweight='light',
fontsize='x-large'
)
# The distribution of category against views
chart = sns.catplot(
data=combined_data,
x='category',
y='views_log',
kind='box',
palette='Set1'
)
plt.xticks(
rotation=45,
horizontalalignment='right',
fontweight='light'
)
# The distribution of dislikes against views
chart = sns.catplot(
data=combined_data,
x='category',
y='dislikes_log',
kind='box',
palette='Set1'
)
plt.xticks(
rotation=45,
horizontalalignment='right',
fontweight='light'
)
# Count the number of tags.
combined_data["num_tags"]=combined_data['tags'].apply(lambda x:x.count("|")+1)
# Compute the length of description
combined_data["desc_len"]=combined_data['description'].apply(len)
# Compute the length of title
combined_data["len_title"]=combined_data['title'].apply(len)
# Split 'publish_time' feature into three parts time, date, and weekday
combined_data['publish_weekday']= combined_data['publish_time'].apply(lambda x:x.weekday()+1)
combined_data['publish_date'] = combined_data['publish_time'].apply(lambda x:x.date())
combined_data['publish_time'] = combined_data['publish_time'].apply(lambda x:x.time())
# Compute the number of videos published per day of the week.
plt.figure(figsize=(10,5))
chart = sns.countplot(
data=combined_data,
x='publish_weekday',
palette='Set1',
order=combined_data['publish_weekday'].value_counts().index
)
combined_data=combined_data.drop(['title', 'channel_title', 'category', 'tags', 'views', 'likes', 'dislikes', 'comment_count', 'thumbnail_link', 'thumbnail_link', 'description','publish_date', 'publish_time','trending_date'], axis=1)
combined_data.publish_weekday = combined_data.publish_weekday.astype('category')
combined_data.country = combined_data.country.astype('category')
combined_data = pd.concat([combined_data, pd.get_dummies(combined_data['publish_weekday'])], axis=1)
combined_data = pd.concat([combined_data, pd.get_dummies(combined_data['country'])], axis=1)
combined_data = pd.concat([combined_data, pd.get_dummies(combined_data['category_id'])], axis=1)
combined_data = combined_data.drop(['category_id','country','publish_weekday'],axis=1)
# Write out the modified data to a file
combined_data_sec_2 = combined_data.copy()
combined_data_sec_2.rename(columns = {'views_log':'label'}, inplace = True)
combined_data_sec_2.to_csv('combined_data.csv')
# Split the data into features and label
combined_data=pd.read_csv('combined_data.csv').set_index('video_id')
label = combined_data['label']
features = combined_data.drop(['label'],axis=1)
from sklearn.model_selection import train_test_split
x_train, x_test, y_train, y_test = train_test_split(features, label, test_size=0.2)
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score, mean_absolute_error, mean_squared_error
clf = LinearRegression()
clf.fit(x_train, y_train)
y_pred = clf.predict(x_test)
print("Score:", clf.score(x_test, y_test))
plt.figure(dpi=100)
plt.scatter(y_test, y_pred)
# Different error measures
print("MAE:", mean_absolute_error(y_test, y_pred))
print('MSE:', mean_squared_error(y_test, y_pred))
print('RMSE:', np.sqrt(mean_squared_error(y_test, y_pred)))
Use Principal component analysis to reduce number of dimensions of the dataset
import numpy as np
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
# Fit a pca model on your train set
X = StandardScaler().fit_transform(features)
pca = PCA(n_components=38)
X2 = pca.fit_transform(X)
np.set_printoptions(suppress=True)
pca.explained_variance_ratio_
pc_vs_variance = np.cumsum(pca.explained_variance_ratio_)
# Plot the explained_variance_ratio against the number of components
pc_vs_variance
plt.plot(pc_vs_variance)
pca = PCA(n_components=31)
pca.fit(x_train)
x_train_2 = pca.transform(x_train)
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import GridSearchCV
# Grid search
param_grid = {
'n_estimators' : [20,30,40],
'max_depth' : [8,12,16]
}
CV_clf = GridSearchCV(estimator=RandomForestRegressor(), param_grid=param_grid,cv=3)
CV_clf.fit(x_train_2, y_train)
CV_clf.best_params_
# Fit the random forest on the traing data
rfr = RandomForestRegressor(max_depth=16,n_estimators=40)
rfr.fit(x_train_2,y_train)
pca = PCA(n_components=31)
pca.fit(x_test)
x_test_2 = pca.transform(x_test)
# Make predictions
y_pred=rfr.predict(x_test_2)
np.sqrt(mean_squared_error(y_test, y_pred))
!apt install libkrb5-dev
!wget https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install findspark
!pip install sparkmagic
!pip install pyspark
!pip install pyspark --user
!pip install seaborn --user
!pip install plotly --user
!pip install imageio --user
!pip install folium --user
!apt update
!apt install gcc python-dev libkrb5-dev
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os
spark = SparkSession.builder.appName('ml-hw4').getOrCreate()
%load_ext sparkmagic.magics
#graph section
import networkx as nx
# SQLite RDBMS
import sqlite3
# Parallel processing
# import swifter
import pandas as pd
# NoSQL DB
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError, OperationFailure
import os
os.environ['SPARK_HOME'] = '/content/spark-2.4.5-bin-hadoop2.7'
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
import pyspark
from pyspark.sql import SQLContext
try:
if(spark == None):
spark = SparkSession.builder.appName('Initial').getOrCreate()
sqlContext=SQLContext(spark)
except NameError:
spark = SparkSession.builder.appName('Initial').getOrCreate()
sqlContext=SQLContext(spark)
train_sdf = spark.read.format('csv').options(header='true', inferSchema='true').load('combined_data.csv')
train_sdf.show()
#Print the dataframe schema and verify
train_sdf.printSchema()
from pyspark.ml.feature import StringIndexer, VectorAssembler
# Drop unwanted columns
all_columns = train_sdf.columns
drop_columns = ['video_id','label']
columns_to_use = [i for i in all_columns if i not in drop_columns]
# Create a VectorAssembler object
assembler = VectorAssembler(inputCols=columns_to_use, outputCol='features')
In this step, we will create a pipeline with a single stage - the assembler.
from pyspark.ml import Pipeline
# Create a pipeline
pipeline = Pipeline(stages=[assembler])
modified_data_sdf = pipeline.fit(train_sdf).transform(train_sdf).drop(*columns_to_use)
modified_data_sdf.show()
# Split into an 80-20 ratio between the train and test sets.
train_sdf, test_sdf = modified_data_sdf.randomSplit([0.8, 0.2], seed = 2020)
Using Spark ML's linear regression to Train a linear regression model and try to predict the views.
from pyspark.ml.regression import LinearRegression
# Train a linear regression model
lr = LinearRegression(featuresCol = 'features', labelCol='label')
lr_model = lr.fit(train_sdf)
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
predictions = lr_model.transform(test_sdf)
from pyspark.ml.evaluation import RegressionEvaluator
# Compute root mean squared error on the test set
test_result = lr_model.evaluate(test_sdf)
test_rmse_orig = test_result.rootMeanSquaredError
test_rmse_orig
# Add regularization to avoid overfitting, try out L1, L2 and elastic net
lrl1 = LinearRegression(featuresCol = 'features', labelCol='label',regParam=0.1,elasticNetParam=1.0)
lrl1_model = lrl1.fit(train_sdf)
lrl2 = LinearRegression(featuresCol = 'features', labelCol='label',regParam=0.2,elasticNetParam=0.0)
lrl2_model = lrl2.fit(train_sdf)
lre = LinearRegression(featuresCol = 'features', labelCol='label',regParam=0.1,elasticNetParam=0.5)
lre_model = lre.fit(train_sdf)
# Compute predictions using each of the models
l1_predictions = lrl1_model.transform(test_sdf)
l2_predictions = lrl2_model.transform(test_sdf)
elastic_net_predictions = lre_model.transform(test_sdf)
# Compute root mean squared error on test set for each of your models
test_result_l1 = lrl1_model.evaluate(test_sdf)
test_rmse_l1 = test_result_l1.rootMeanSquaredError
test_result_l2 = lrl2_model.evaluate(test_sdf)
test_rmse_l2 = test_result_l2.rootMeanSquaredError
test_result_e = lre_model.evaluate(test_sdf)
test_rmse_elastic = test_result_e.rootMeanSquaredError
print(test_rmse_l1)
print(test_rmse_l2)
print(test_rmse_elastic)
from pyspark.ml.regression import RandomForestRegressor
# Create a random forest regressor model, fit the training data and evaluate using RegressionEvaluator
rf_reg = RandomForestRegressor(labelCol="label", featuresCol="features")
rf_model = rf_reg.fit(train_sdf)
train_pred = rf_model.transform(train_sdf)
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
train_rmse_rf = evaluator.evaluate(train_pred)#TODO: calculate the training rmse
train_rmse_rf
# Predictions on the test set
predictions = rf_model.transform(test_sdf)
from pyspark.ml.evaluation import RegressionEvaluator
# Calculate the rmse on the test set
rmse_rf = evaluator.evaluate(predictions)
rmse_rf
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import PCA as PCAml
pca = PCAml(k=31, inputCol='features', outputCol='pcaFeature')
pca_fit = pca.fit(train_sdf)
train_sdf_pca=pca_fit.transform(train_sdf)
pca_model = lr.fit(train_sdf_pca)
pcaSummary = pca_model.summary
training_rmse_pca = pcaSummary.rootMeanSquaredError
training_rmse_pca
training_rmse_pca
# Your code goes here
from pyspark.ml.evaluation import RegressionEvaluator
test_sdf_pca=pca_fit.transform(test_sdf)
# Get predictions on the test set
predictions_pca = pca_model.transform(test_sdf_pca)
test_result_pca = pca_model.evaluate(test_sdf_pca)
# Get RMSE for test data
test_rmse_pca = test_result_pca.rootMeanSquaredError #TODO: Get RMSE for test data
test_rmse_pca