Version¶
In [1]:
#pyspark
import pyspark
pyspark.__version__
Out[1]:
'3.5.5'
Chargement et préparation des données¶
Lecture et organisation¶
In [2]:
#changer le répertoire courant
import os
os.chdir("C:/Users/ricco/Desktop/demo")
In [3]:
#format csv
import csv
#accès au fichier en lecture
f = open("market_basket_sequential.csv","r")
#parseur pour lire chaque ligne
parseur = csv.reader(f,delimiter=",")
#liste des utilisateurs
lst_user = []
#liste des itemsets
lst_iset = []
#pour chaque ligne lue
for ligne in parseur:
#récupérer l'info
lst_user.append(ligne[0])
lst_iset.append(ligne[1:])
#fermeture du fichier
f.close()
In [4]:
#nombre de transactions
print(len(lst_user))
print(len(lst_iset))
209 209
In [5]:
#mettre sous la forme d'un dataframe Pandas
import pandas
df_pandas = pandas.DataFrame({"user":lst_user,"itemsets":lst_iset})
#premières lignes
df_pandas.head()
Out[5]:
user | itemsets | |
---|---|---|
0 | 1 | [shrimp, almonds, avocado, vegetables mix] |
1 | 1 | [burgers, meatballs, eggs] |
2 | 1 | [chutney] |
3 | 1 | [turkey, avocado] |
4 | 1 | [mineral water, milk, energy bar, whole wheat ... |
In [6]:
#info
df_pandas.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 209 entries, 0 to 208 Data columns (total 2 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 user 209 non-null object 1 itemsets 209 non-null object dtypes: object(2) memory usage: 3.4+ KB
Démarrage session Spark et mise en forme des données¶
In [7]:
#création d'une session
from pyspark.sql import SparkSession
#configuré localement avec 4 coeurs
#et max de mémoire utilisée 1 GB
spark = SparkSession.builder \
.master("local[*]") \
.config("spark.executor.memory", "1g") \
.config("spark.executor.cores", "4") \
.getOrCreate()
#type de l'objet
print(type(spark))
<class 'pyspark.sql.session.SparkSession'>
Première mise en forme¶
In [8]:
# enabling the Apache Arrow for converting
# Pandas to pySpark DF(DataFrame)
# voir : https://www.geeksforgeeks.org/how-to-convert-pandas-to-pyspark-dataframe/
# /!\ sinon, ça ne marche pas -- ARGH !!!
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
In [9]:
#première forme de dataframe
df_itemset = spark.createDataFrame(df_pandas)
df_itemset.printSchema()
root |-- user: string (nullable = true) |-- itemsets: array (nullable = true) | |-- element: string (containsNull = true)
In [10]:
#premières lignes - individu stat = transaction
df_itemset.show(5,truncate=False)
+----+---------------------------------------------------+ |user|itemsets | +----+---------------------------------------------------+ |1 |[shrimp, almonds, avocado, vegetables mix] | |1 |[burgers, meatballs, eggs] | |1 |[chutney] | |1 |[turkey, avocado] | |1 |[mineral water, milk, energy bar, whole wheat rice]| +----+---------------------------------------------------+ only showing top 5 rows
Cette organisation se prête à l'analyse des données de transactions : extraction des itemsets fréquents et des règles d'association, sans tenir compte du détenteur du panier (cf. https://www.youtube.com/watch?v=K_b2FkYDjpk)
Second mise en forme : données séquentielles¶
In [11]:
#regroupement selon les users
#pour disposer d'une liste de listes
from pyspark.sql.functions import collect_list
df_user = df_itemset.groupBy("user").agg(collect_list("itemsets"))
#organisation
df_user.printSchema()
root |-- user: string (nullable = true) |-- collect_list(itemsets): array (nullable = false) | |-- element: array (containsNull = false) | | |-- element: string (containsNull = true)
In [12]:
#nombre
df_user.count()
Out[12]:
30
In [13]:
#premières lignes : individu stat = user
df_user.show(5,truncate=False)
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |user|collect_list(itemsets) | +----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |3 |[[turkey, burgers, mineral water, eggs], [spaghetti, champagne, cookies], [mineral water, salmon], [mineral water], [shrimp, chocolate, chicken, honey], [turkey, eggs], [turkey, fresh tuna, tomatoes, spaghetti]]| |5 |[[parmesan cheese, spaghetti, soup, avocado], [ground beef, spaghetti, mineral water, milk], [sparkling water], [mineral water, eggs, chicken, chocolate]] | |1 |[[shrimp, almonds, avocado, vegetables mix], [burgers, meatballs, eggs], [chutney], [turkey, avocado], [mineral water, milk, energy bar, whole wheat rice], [low fat yogurt]] | |4 |[[meatballs, milk, honey, french fries], [red wine, shrimp, pasta, pepper], [rice, sparkling water], [spaghetti, mineral water, ham, body spray], [burgers, grated cheese, shrimp, pasta], [eggs]] | |2 |[[whole wheat pasta, french fries], [soup, light cream, shallot], [frozen vegetables, spaghetti, green tea], [french fries], [eggs, pet food], [cookies]] | +----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 5 rows
In [14]:
#renommer la colonne des séquences
df_user = df_user.withColumnRenamed('collect_list(itemsets)','sequence')
df_user.show(5,truncate=False)
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |user|sequence | +----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |3 |[[turkey, burgers, mineral water, eggs], [spaghetti, champagne, cookies], [mineral water, salmon], [mineral water], [shrimp, chocolate, chicken, honey], [turkey, eggs], [turkey, fresh tuna, tomatoes, spaghetti]]| |5 |[[parmesan cheese, spaghetti, soup, avocado], [ground beef, spaghetti, mineral water, milk], [sparkling water], [mineral water, eggs, chicken, chocolate]] | |1 |[[shrimp, almonds, avocado, vegetables mix], [burgers, meatballs, eggs], [chutney], [turkey, avocado], [mineral water, milk, energy bar, whole wheat rice], [low fat yogurt]] | |4 |[[meatballs, milk, honey, french fries], [red wine, shrimp, pasta, pepper], [rice, sparkling water], [spaghetti, mineral water, ham, body spray], [burgers, grated cheese, shrimp, pasta], [eggs]] | |2 |[[whole wheat pasta, french fries], [soup, light cream, shallot], [frozen vegetables, spaghetti, green tea], [french fries], [eggs, pet food], [cookies]] | +----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ only showing top 5 rows
Sequential Pattern Mining¶
Instanciation et paramètres
In [15]:
#importation de la fonction
from pyspark.ml.fpm import PrefixSpan
#instanciation
spm = PrefixSpan(minSupport=0.17,maxPatternLength=3,sequenceCol="sequence")
#colonne utilisée pour décrire les séquences
print(spm.getSequenceCol())
#paramètres - min support
print(spm.getMinSupport())
#longueur max. des pattern
print(spm.getMaxPatternLength())
sequence 0.17 3
Entraînement
In [16]:
#entraînement
seq_result = spm.findFrequentSequentialPatterns(df_user)
#type de l'objet obtenu
type(seq_result)
Out[16]:
pyspark.sql.dataframe.DataFrame
In [17]:
#structure
seq_result.printSchema()
root |-- sequence: array (nullable = false) | |-- element: array (containsNull = false) | | |-- element: string (containsNull = true) |-- freq: long (nullable = false)
In [18]:
#nombre de séquences extraites
seq_result.count()
Out[18]:
140
In [19]:
#affichage -- les séquences de longueur 1 pas vraiment sequential
#mais donne des infos sur le comportement des "users"
seq_result.show(truncate=False)
+---------------------+----+ |sequence |freq| +---------------------+----+ |[[french fries]] |13 | |[[cake]] |8 | |[[ground beef]] |12 | |[[chicken]] |9 | |[[soup]] |11 | |[[herb & pepper]] |10 | |[[frozen vegetables]]|11 | |[[escalope]] |9 | |[[shrimp]] |15 | |[[eggs]] |26 | |[[pasta]] |13 | |[[energy bar]] |6 | |[[milk]] |14 | |[[whole wheat pasta]]|8 | |[[grated cheese]] |7 | |[[burgers]] |17 | |[[green tea]] |8 | |[[spaghetti]] |21 | |[[chocolate]] |18 | |[[tomatoes]] |9 | +---------------------+----+ only showing top 20 rows
In [22]:
#vérifions pour un produit. ex. "eggs"
#tranformer les données en Pandas
dp_user = df_user.toPandas()
#produit acheté au moins une fois par n "users"
nb_eggs = 0
for i in range(dp_user.shape[0]):
seq = dp_user.sequence.iloc[i]
ok = False
for itemset in seq:
ok = ok or ("eggs" in itemset)
nb_eggs = nb_eggs + int(ok)
print(nb_eggs)
26
In [23]:
#alias pour fonctions pyspark
import pyspark.sql.functions as F
#afficher les séquences d'au moins 2 évènements
res = seq_result.filter(F.size(F.col("sequence")) > 1)
res.count()
Out[23]:
112
In [24]:
#afficher selon le support(absolu) décroissant
res.orderBy(res.freq.desc()).show(truncate=False)
+----------------------------------+----+ |sequence |freq| +----------------------------------+----+ |[[eggs], [mineral water]] |16 | |[[spaghetti], [eggs]] |14 | |[[mineral water], [eggs]] |13 | |[[mineral water], [mineral water]]|12 | |[[spaghetti], [mineral water]] |12 | |[[spaghetti], [chocolate]] |12 | |[[eggs], [spaghetti]] |12 | |[[burgers], [mineral water]] |11 | |[[shrimp], [eggs]] |11 | |[[spaghetti], [spaghetti]] |10 | |[[pasta], [eggs]] |10 | |[[mineral water], [chocolate]] |10 | |[[shrimp], [mineral water]] |10 | |[[ground beef], [mineral water]] |10 | |[[eggs], [chocolate]] |10 | |[[pasta], [mineral water]] |9 | |[[chocolate], [mineral water]] |9 | |[[mineral water], [spaghetti]] |9 | |[[shrimp, pasta], [eggs]] |9 | |[[eggs], [ground beef]] |9 | +----------------------------------+----+ only showing top 20 rows
In [25]:
#séquence de longueur > 2 ?
seq_result.filter(F.size(F.col("sequence")) > 2).show(truncate=False)
+-----------------------------------------------+----+ |sequence |freq| +-----------------------------------------------+----+ |[[chocolate], [mineral water], [chocolate]] |6 | |[[chocolate], [spaghetti], [eggs]] |6 | |[[pasta], [eggs], [mineral water]] |6 | |[[mineral water], [eggs], [spaghetti]] |6 | |[[herb & pepper], [eggs], [mineral water]] |6 | |[[spaghetti], [eggs], [mineral water]] |6 | |[[spaghetti], [eggs], [chocolate]] |6 | |[[spaghetti], [mineral water], [eggs]] |6 | |[[spaghetti], [mineral water], [mineral water]]|8 | |[[spaghetti], [mineral water], [spaghetti]] |6 | |[[spaghetti], [mineral water], [chocolate]] |8 | |[[spaghetti], [spaghetti], [mineral water]] |7 | |[[spaghetti], [spaghetti], [chocolate]] |7 | |[[spaghetti], [ground beef], [mineral water]] |6 | |[[spaghetti], [ground beef], [chocolate]] |6 | |[[shrimp], [eggs], [mineral water]] |6 | |[[shrimp], [burgers], [mineral water]] |6 | |[[eggs], [mineral water], [mineral water]] |7 | |[[eggs], [mineral water], [spaghetti]] |6 | |[[eggs], [mineral water], [chocolate]] |6 | +-----------------------------------------------+----+ only showing top 20 rows
Stopper la session¶
In [26]:
#stop
spark.stop()