Version¶

In [1]:
#pyspark
import pyspark
pyspark.__version__
Out[1]:
'3.5.5'

Chargement et préparation des données¶

Lecture et organisation¶

In [2]:
#changer le répertoire courant
import os
os.chdir("C:/Users/ricco/Desktop/demo")
In [3]:
#format csv
import csv

#accès au fichier en lecture
f = open("market_basket_sequential.csv","r")

#parseur pour lire chaque ligne
parseur = csv.reader(f,delimiter=",")

#liste des utilisateurs
lst_user = []

#liste des itemsets
lst_iset = []

#pour chaque ligne lue
for ligne in parseur:
    #récupérer l'info
    lst_user.append(ligne[0])
    lst_iset.append(ligne[1:])

#fermeture du fichier
f.close()
In [4]:
#nombre de transactions
print(len(lst_user))
print(len(lst_iset))
209
209
In [5]:
#mettre sous la forme d'un dataframe Pandas
import pandas
df_pandas = pandas.DataFrame({"user":lst_user,"itemsets":lst_iset})

#premières lignes
df_pandas.head()
Out[5]:
user itemsets
0 1 [shrimp, almonds, avocado, vegetables mix]
1 1 [burgers, meatballs, eggs]
2 1 [chutney]
3 1 [turkey, avocado]
4 1 [mineral water, milk, energy bar, whole wheat ...
In [6]:
#info
df_pandas.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 209 entries, 0 to 208
Data columns (total 2 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   user      209 non-null    object
 1   itemsets  209 non-null    object
dtypes: object(2)
memory usage: 3.4+ KB

Démarrage session Spark et mise en forme des données¶

In [7]:
#création d'une session
from pyspark.sql import SparkSession
#configuré localement avec 4 coeurs
#et max de mémoire utilisée 1 GB
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.executor.memory", "1g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

#type de l'objet
print(type(spark))
<class 'pyspark.sql.session.SparkSession'>

Première mise en forme¶

In [8]:
# enabling the Apache Arrow for converting
# Pandas to pySpark DF(DataFrame)
# voir : https://www.geeksforgeeks.org/how-to-convert-pandas-to-pyspark-dataframe/
# /!\ sinon, ça ne marche pas -- ARGH !!!
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
In [9]:
#première forme de dataframe
df_itemset = spark.createDataFrame(df_pandas)
df_itemset.printSchema()
root
 |-- user: string (nullable = true)
 |-- itemsets: array (nullable = true)
 |    |-- element: string (containsNull = true)

In [10]:
#premières lignes - individu stat = transaction
df_itemset.show(5,truncate=False)
+----+---------------------------------------------------+
|user|itemsets                                           |
+----+---------------------------------------------------+
|1   |[shrimp, almonds, avocado, vegetables mix]         |
|1   |[burgers, meatballs, eggs]                         |
|1   |[chutney]                                          |
|1   |[turkey, avocado]                                  |
|1   |[mineral water, milk, energy bar, whole wheat rice]|
+----+---------------------------------------------------+
only showing top 5 rows

Cette organisation se prête à l'analyse des données de transactions : extraction des itemsets fréquents et des règles d'association, sans tenir compte du détenteur du panier (cf. https://www.youtube.com/watch?v=K_b2FkYDjpk)

Second mise en forme : données séquentielles¶

In [11]:
#regroupement selon les users
#pour disposer d'une liste de listes
from pyspark.sql.functions import collect_list
df_user = df_itemset.groupBy("user").agg(collect_list("itemsets"))

#organisation
df_user.printSchema()
root
 |-- user: string (nullable = true)
 |-- collect_list(itemsets): array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: string (containsNull = true)

In [12]:
#nombre
df_user.count()
Out[12]:
30
In [13]:
#premières lignes : individu stat = user
df_user.show(5,truncate=False)
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user|collect_list(itemsets)                                                                                                                                                                                             |
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|3   |[[turkey, burgers, mineral water, eggs], [spaghetti, champagne, cookies], [mineral water, salmon], [mineral water], [shrimp, chocolate, chicken, honey], [turkey, eggs], [turkey, fresh tuna, tomatoes, spaghetti]]|
|5   |[[parmesan cheese, spaghetti, soup, avocado], [ground beef, spaghetti, mineral water, milk], [sparkling water], [mineral water, eggs, chicken, chocolate]]                                                         |
|1   |[[shrimp, almonds, avocado, vegetables mix], [burgers, meatballs, eggs], [chutney], [turkey, avocado], [mineral water, milk, energy bar, whole wheat rice], [low fat yogurt]]                                      |
|4   |[[meatballs, milk, honey, french fries], [red wine, shrimp, pasta, pepper], [rice, sparkling water], [spaghetti, mineral water, ham, body spray], [burgers, grated cheese, shrimp, pasta], [eggs]]                 |
|2   |[[whole wheat pasta, french fries], [soup, light cream, shallot], [frozen vegetables, spaghetti, green tea], [french fries], [eggs, pet food], [cookies]]                                                          |
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows

In [14]:
#renommer la colonne des séquences
df_user = df_user.withColumnRenamed('collect_list(itemsets)','sequence')
df_user.show(5,truncate=False)
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user|sequence                                                                                                                                                                                                           |
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|3   |[[turkey, burgers, mineral water, eggs], [spaghetti, champagne, cookies], [mineral water, salmon], [mineral water], [shrimp, chocolate, chicken, honey], [turkey, eggs], [turkey, fresh tuna, tomatoes, spaghetti]]|
|5   |[[parmesan cheese, spaghetti, soup, avocado], [ground beef, spaghetti, mineral water, milk], [sparkling water], [mineral water, eggs, chicken, chocolate]]                                                         |
|1   |[[shrimp, almonds, avocado, vegetables mix], [burgers, meatballs, eggs], [chutney], [turkey, avocado], [mineral water, milk, energy bar, whole wheat rice], [low fat yogurt]]                                      |
|4   |[[meatballs, milk, honey, french fries], [red wine, shrimp, pasta, pepper], [rice, sparkling water], [spaghetti, mineral water, ham, body spray], [burgers, grated cheese, shrimp, pasta], [eggs]]                 |
|2   |[[whole wheat pasta, french fries], [soup, light cream, shallot], [frozen vegetables, spaghetti, green tea], [french fries], [eggs, pet food], [cookies]]                                                          |
+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
only showing top 5 rows

Sequential Pattern Mining¶

Instanciation et paramètres

In [15]:
#importation de la fonction
from pyspark.ml.fpm import PrefixSpan

#instanciation
spm = PrefixSpan(minSupport=0.17,maxPatternLength=3,sequenceCol="sequence")

#colonne utilisée pour décrire les séquences
print(spm.getSequenceCol())

#paramètres - min support
print(spm.getMinSupport())

#longueur max. des pattern
print(spm.getMaxPatternLength())
sequence
0.17
3

Entraînement

In [16]:
#entraînement
seq_result = spm.findFrequentSequentialPatterns(df_user)

#type de l'objet obtenu
type(seq_result)
Out[16]:
pyspark.sql.dataframe.DataFrame
In [17]:
#structure
seq_result.printSchema()
root
 |-- sequence: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: string (containsNull = true)
 |-- freq: long (nullable = false)

In [18]:
#nombre de séquences extraites
seq_result.count()
Out[18]:
140
In [19]:
#affichage -- les séquences de longueur 1 pas vraiment sequential
#mais donne des infos sur le comportement des "users"
seq_result.show(truncate=False)
+---------------------+----+
|sequence             |freq|
+---------------------+----+
|[[french fries]]     |13  |
|[[cake]]             |8   |
|[[ground beef]]      |12  |
|[[chicken]]          |9   |
|[[soup]]             |11  |
|[[herb & pepper]]    |10  |
|[[frozen vegetables]]|11  |
|[[escalope]]         |9   |
|[[shrimp]]           |15  |
|[[eggs]]             |26  |
|[[pasta]]            |13  |
|[[energy bar]]       |6   |
|[[milk]]             |14  |
|[[whole wheat pasta]]|8   |
|[[grated cheese]]    |7   |
|[[burgers]]          |17  |
|[[green tea]]        |8   |
|[[spaghetti]]        |21  |
|[[chocolate]]        |18  |
|[[tomatoes]]         |9   |
+---------------------+----+
only showing top 20 rows

In [22]:
#vérifions pour un produit. ex. "eggs"
#tranformer les données en Pandas
dp_user = df_user.toPandas()

#produit acheté au moins une fois par n "users"
nb_eggs = 0
for i in range(dp_user.shape[0]):
    seq = dp_user.sequence.iloc[i]
    ok = False
    for itemset in seq:
        ok = ok or ("eggs" in itemset)
    nb_eggs = nb_eggs + int(ok)
print(nb_eggs)
26
In [23]:
#alias pour fonctions pyspark
import pyspark.sql.functions as F

#afficher les séquences d'au moins 2 évènements
res = seq_result.filter(F.size(F.col("sequence")) > 1)
res.count()
Out[23]:
112
In [24]:
#afficher selon le support(absolu) décroissant
res.orderBy(res.freq.desc()).show(truncate=False)
+----------------------------------+----+
|sequence                          |freq|
+----------------------------------+----+
|[[eggs], [mineral water]]         |16  |
|[[spaghetti], [eggs]]             |14  |
|[[mineral water], [eggs]]         |13  |
|[[mineral water], [mineral water]]|12  |
|[[spaghetti], [mineral water]]    |12  |
|[[spaghetti], [chocolate]]        |12  |
|[[eggs], [spaghetti]]             |12  |
|[[burgers], [mineral water]]      |11  |
|[[shrimp], [eggs]]                |11  |
|[[spaghetti], [spaghetti]]        |10  |
|[[pasta], [eggs]]                 |10  |
|[[mineral water], [chocolate]]    |10  |
|[[shrimp], [mineral water]]       |10  |
|[[ground beef], [mineral water]]  |10  |
|[[eggs], [chocolate]]             |10  |
|[[pasta], [mineral water]]        |9   |
|[[chocolate], [mineral water]]    |9   |
|[[mineral water], [spaghetti]]    |9   |
|[[shrimp, pasta], [eggs]]         |9   |
|[[eggs], [ground beef]]           |9   |
+----------------------------------+----+
only showing top 20 rows

In [25]:
#séquence de longueur > 2 ?
seq_result.filter(F.size(F.col("sequence")) > 2).show(truncate=False)
+-----------------------------------------------+----+
|sequence                                       |freq|
+-----------------------------------------------+----+
|[[chocolate], [mineral water], [chocolate]]    |6   |
|[[chocolate], [spaghetti], [eggs]]             |6   |
|[[pasta], [eggs], [mineral water]]             |6   |
|[[mineral water], [eggs], [spaghetti]]         |6   |
|[[herb & pepper], [eggs], [mineral water]]     |6   |
|[[spaghetti], [eggs], [mineral water]]         |6   |
|[[spaghetti], [eggs], [chocolate]]             |6   |
|[[spaghetti], [mineral water], [eggs]]         |6   |
|[[spaghetti], [mineral water], [mineral water]]|8   |
|[[spaghetti], [mineral water], [spaghetti]]    |6   |
|[[spaghetti], [mineral water], [chocolate]]    |8   |
|[[spaghetti], [spaghetti], [mineral water]]    |7   |
|[[spaghetti], [spaghetti], [chocolate]]        |7   |
|[[spaghetti], [ground beef], [mineral water]]  |6   |
|[[spaghetti], [ground beef], [chocolate]]      |6   |
|[[shrimp], [eggs], [mineral water]]            |6   |
|[[shrimp], [burgers], [mineral water]]         |6   |
|[[eggs], [mineral water], [mineral water]]     |7   |
|[[eggs], [mineral water], [spaghetti]]         |6   |
|[[eggs], [mineral water], [chocolate]]         |6   |
+-----------------------------------------------+----+
only showing top 20 rows

Stopper la session¶

In [26]:
#stop
spark.stop()