Version¶
In [1]:
#pyspark
import pyspark
pyspark.__version__
Out[1]:
'3.5.4'
Chargement et inspection des données¶
In [2]:
#changer le répertoire courant
import os
os.chdir("C:/Users/ricco/Desktop/demo")
In [3]:
#création d'une session
from pyspark.sql import SparkSession
#configuré localement avec 4 coeurs
#et max de mémoire utilisée 1 GB
spark = SparkSession.builder \
.master("local[*]") \
.config("spark.executor.memory", "1g") \
.config("spark.executor.cores", "4") \
.getOrCreate()
#type de l'objet
print(type(spark))
<class 'pyspark.sql.session.SparkSession'>
In [4]:
#chargement du fichier + deviner le type des variables
df = spark.read.csv("kddcup99twice.txt",header=True,inferSchema=True)
#type de l'objet
print(type(df))
<class 'pyspark.sql.dataframe.DataFrame'>
In [5]:
#dimensions
print(f"Lignes : {df.count()}")
print(f"Colonnes : {len(df.columns)}")
Lignes : 9796862 Colonnes : 42
In [6]:
#premières lignes
df.show(5)
+---+---+----+---+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-------+ | V1| V2| V3| V4| V5| V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|V20|V21|V22|V23|V24|V25|V26|V27|V28|V29|V30|V31|V32|V33|V34|V35|V36|V37|V38|V39|V40|V41| V42| +---+---+----+---+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-------+ | 0|tcp|http| SF|215|45076| 0| 0| 0| 0| 0| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 1| 1|0.0|0.0|0.0|0.0|1.0|0.0|0.0| 0| 0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|normal.| | 0|tcp|http| SF|215|45076| 0| 0| 0| 0| 0| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 1| 1|0.0|0.0|0.0|0.0|1.0|0.0|0.0| 0| 0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|normal.| | 0|tcp|http| SF|162| 4528| 0| 0| 0| 0| 0| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 2| 2|0.0|0.0|0.0|0.0|1.0|0.0|0.0| 1| 1|1.0|0.0|1.0|0.0|0.0|0.0|0.0|0.0|normal.| | 0|tcp|http| SF|162| 4528| 0| 0| 0| 0| 0| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 2| 2|0.0|0.0|0.0|0.0|1.0|0.0|0.0| 1| 1|1.0|0.0|1.0|0.0|0.0|0.0|0.0|0.0|normal.| | 0|tcp|http| SF|236| 1228| 0| 0| 0| 0| 0| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 1| 1|0.0|0.0|0.0|0.0|1.0|0.0|0.0| 2| 2|1.0|0.0|0.5|0.0|0.0|0.0|0.0|0.0|normal.| +---+---+----+---+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-------+ only showing top 5 rows
In [7]:
#structure de la table
df.printSchema()
root |-- V1: integer (nullable = true) |-- V2: string (nullable = true) |-- V3: string (nullable = true) |-- V4: string (nullable = true) |-- V5: integer (nullable = true) |-- V6: integer (nullable = true) |-- V7: integer (nullable = true) |-- V8: integer (nullable = true) |-- V9: integer (nullable = true) |-- V10: integer (nullable = true) |-- V11: integer (nullable = true) |-- V12: integer (nullable = true) |-- V13: integer (nullable = true) |-- V14: integer (nullable = true) |-- V15: integer (nullable = true) |-- V16: integer (nullable = true) |-- V17: integer (nullable = true) |-- V18: integer (nullable = true) |-- V19: integer (nullable = true) |-- V20: integer (nullable = true) |-- V21: integer (nullable = true) |-- V22: integer (nullable = true) |-- V23: integer (nullable = true) |-- V24: integer (nullable = true) |-- V25: double (nullable = true) |-- V26: double (nullable = true) |-- V27: double (nullable = true) |-- V28: double (nullable = true) |-- V29: double (nullable = true) |-- V30: double (nullable = true) |-- V31: double (nullable = true) |-- V32: integer (nullable = true) |-- V33: integer (nullable = true) |-- V34: double (nullable = true) |-- V35: double (nullable = true) |-- V36: double (nullable = true) |-- V37: double (nullable = true) |-- V38: double (nullable = true) |-- V39: double (nullable = true) |-- V40: double (nullable = true) |-- V41: double (nullable = true) |-- V42: string (nullable = true)
Filtrage¶
In [8]:
#commande filter -- très rapide parce que pas calculée en vrai
#juste préparée
res = df.filter((df.V42 == "normal.") & (df.V2 == "udp"))
In [9]:
#ici oui calculée réellement
print(res.count())
382696
Moyennes pour variables numériques¶
In [10]:
#types des variables
df.dtypes
Out[10]:
[('V1', 'int'), ('V2', 'string'), ('V3', 'string'), ('V4', 'string'), ('V5', 'int'), ('V6', 'int'), ('V7', 'int'), ('V8', 'int'), ('V9', 'int'), ('V10', 'int'), ('V11', 'int'), ('V12', 'int'), ('V13', 'int'), ('V14', 'int'), ('V15', 'int'), ('V16', 'int'), ('V17', 'int'), ('V18', 'int'), ('V19', 'int'), ('V20', 'int'), ('V21', 'int'), ('V22', 'int'), ('V23', 'int'), ('V24', 'int'), ('V25', 'double'), ('V26', 'double'), ('V27', 'double'), ('V28', 'double'), ('V29', 'double'), ('V30', 'double'), ('V31', 'double'), ('V32', 'int'), ('V33', 'int'), ('V34', 'double'), ('V35', 'double'), ('V36', 'double'), ('V37', 'double'), ('V38', 'double'), ('V39', 'double'), ('V40', 'double'), ('V41', 'double'), ('V42', 'string')]
In [11]:
#noms des variables != "string"
col_numeriques = [name for name,type in df.dtypes if (type != 'string')]
print(col_numeriques)
['V1', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'V29', 'V30', 'V31', 'V32', 'V33', 'V34', 'V35', 'V36', 'V37', 'V38', 'V39', 'V40', 'V41']
In [12]:
# récupérer le data frame
# * pour décompresser la liste
# sinon, il aurait fallu faire df.select('V1','V5','V6'...)
dfNum = df.select(*col_numeriques)
#moyennes
print(dfNum.columns)
['V1', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'V29', 'V30', 'V31', 'V32', 'V33', 'V34', 'V35', 'V36', 'V37', 'V38', 'V39', 'V40', 'V41']
In [13]:
#fonctions
import pyspark.sql.functions as F
#moyennes des colonnes
moyennes = dfNum.agg(*(F.avg(col).alias(col) for col in col_numeriques))
#afficher
moyennes.show()
| V1| V5| V6| V7| V8| V9| V10| V11| V12| V13| V14| V15| V16| V17| V18| V19|V20| V21| V22| V23| V24| V25| V26| V27| V28| V29| V30| V31| V32| V33| V34| V35| V36| V37| V38| V39| V40| V41||48.34243046395876|1834.6211752293746|1093.6228137132073|5.716116037972159E-6|6.487791703098401E-4|7.961733052889793E-6|0.012437656057623349|3.205107921291532E-5|0.14352901980246327|0.008088304193730605|6.81850984529536E-5|3.674646024410674E-5|0.012934958152926926|0.001188747988896...|7.430950849363806E-5|0.001021143300783455|0.0|4.082940027122971E-7|8.351653825480036E-4|334.97344027097654|295.26708613431526|0.1779702806878345|0.17803695305701234|0.05766509112815944|0.05773010174074143|0.7898841751576265|0.021179606286207367|0.028260804326932985|232.98108271811932|189.21424492863122|0.7537132359322468|0.03071110524989229|0.6050520115522975|0.006464106568015736|0.17809114796148975|0.1778858618198701|0.057927803821273834|0.05765941380006276| +-----------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---+--------------------+--------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+------------------+--------------------+--------------------+------------------+------------------+------------------+-------------------+------------------+--------------------+-------------------+------------------+--------------------+-------------------+
Filtrage + Moyennes¶
In [14]:
# on peut enchaîner filtrage et sélection
dfFilNum = df.filter((df.V42 == "normal.") & (df.V2 == "udp")).select(*col_numeriques)
#et donc
dfFilNum.agg(*(F.avg(col).alias(col) for col in col_numeriques)).show()
+-----------------+-----------------+-----------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----------------+------------------+--------------------+---+--------------------+---+------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+--------------------+--------------------+---+--------------------+---+ | V1| V5| V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|V20|V21|V22| V23| V24| V25|V26| V27|V28| V29| V30| V31| V32| V33| V34| V35| V36| V37| V38|V39| V40|V41| +-----------------+-----------------+-----------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----------------+------------------+--------------------+---+--------------------+---+------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+--------------------+--------------------+---+--------------------+---+ |1061.262338775425|98.31598448899388|89.40573196479713|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|10.4667934862136|14.623607249618496|4.123377302088340...|0.0|2.059075610928779E-5|0.0|0.9508702468800143|0.05335164203441275|0.08134430461777149|235.52450509020215|150.32263206304742|0.6258512239480872|0.22505325375755567|0.3214867153041238|0.001413811484833...|1.590296214227477E-4|0.0|0.001447258398310...|0.0| +-----------------+-----------------+-----------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----------------+------------------+--------------------+---+--------------------+---+------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+--------------------+--------------------+---+--------------------+---+
Tri de manière décroissante¶
In [15]:
#orderby + fonction (alias F ci-dessus) pour décroissante
#affichage des 5 premières lignes
df.orderBy(F.desc('V6')).show(5)
+-----+---+-------+----+---+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+---+---+---+---+---+---+----------+ | V1| V2| V3| V4| V5| V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|V20|V21|V22|V23|V24|V25|V26|V27|V28|V29|V30|V31|V32|V33| V34| V35|V36|V37|V38|V39|V40|V41| V42| +-----+---+-------+----+---+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+---+---+---+---+---+---+----------+ |10999|tcp| other|RSTR| 0|1309937401| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 1| 1|0.0|0.0|1.0|1.0|1.0|0.0|0.0|255| 1| 0.0|0.65|1.0|0.0|0.0|0.0|1.0|1.0|portsweep.| |10999|tcp| other|RSTR| 0|1309937401| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 1| 1|0.0|0.0|1.0|1.0|1.0|0.0|0.0|255| 1| 0.0|0.65|1.0|0.0|0.0|0.0|1.0|1.0|portsweep.| |39930|tcp|private|RSTR| 0| 400291060| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 2| 2|0.0|0.0|1.0|1.0|1.0|0.0|0.0|255| 2|0.01| 0.5|1.0|0.0|0.0|0.0|1.0|1.0|portsweep.| |39930|tcp|private|RSTR| 0| 400291060| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 2| 2|0.0|0.0|1.0|1.0|1.0|0.0|0.0|255| 2|0.01| 0.5|1.0|0.0|0.0|0.0|1.0|1.0|portsweep.| |39869|tcp|private|RSTR| 0| 400291060| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 1| 1|0.0|0.0|1.0|1.0|1.0|0.0|0.0|255| 1| 0.0| 0.5|1.0|0.0|0.0|0.0|1.0|1.0|portsweep.| +-----+---+-------+----+---+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+---+---+---+---+---+---+----------+ only showing top 5 rows
Comptage des valeurs¶
In [17]:
#selon V42 - très rapide parce que "lazy"
res = df.groupby('V42').count()
In [18]:
#traitement quand demande affichage
res.orderBy(F.desc('count')).show()
+----------------+-------+ | V42| count| +----------------+-------+ | smurf.|5615772| | neptune.|2144034| | normal.|1945562| | satan.| 31784| | ipsweep.| 24962| | portsweep.| 20826| | nmap.| 4632| | back.| 4406| | warezclient.| 2040| | teardrop.| 1958| | pod.| 528| | guess_passwd.| 106| |buffer_overflow.| 60| | land.| 42| | warezmaster.| 40| | imap.| 24| | rootkit.| 20| | loadmodule.| 18| | ftp_write.| 16| | multihop.| 14| +----------------+-------+ only showing top 20 rows
Moyennes conditionnelles (1 critère)¶
In [19]:
#moyennes de V34 selon V42
#paramétrer show pour montrer toutes les lignes
res = df.groupby('V42').avg('V34').orderBy('avg(V34)')
res.show(res.count())
+----------------+--------------------+ | V42| avg(V34)| +----------------+--------------------+ | portsweep.|0.002928070680879...| | perl.|0.013333333333333334| | satan.|0.014214699219732984| | neptune.| 0.0433713178056304| | spy.| 0.185| | teardrop.| 0.2468335035750767| | rootkit.| 0.306| | nmap.| 0.5267357512953367| | pod.| 0.6597348484848484| | multihop.| 0.7157142857142856| | warezclient.| 0.7354411764705884| | loadmodule.| 0.8355555555555555| | normal.| 0.8448791968592703| | land.| 0.87| | ftp_write.| 0.875| | warezmaster.| 0.9| | imap.| 0.9166666666666666| | ipsweep.| 0.9304687124429131| | phf.| 0.9725| | smurf.| 0.999691383482093| | guess_passwd.| 1.0| |buffer_overflow.| 1.0| | back.| 1.0| +----------------+--------------------+
Moyennes conditionnelles (2 critères)¶
In [20]:
#moyennes de V34 en fonction de (V2, V42)
res = df.groupby('V42','V2').avg('V34')
res.show(res.count())
+----------------+----+--------------------+ | V42| V2| avg(V34)| +----------------+----+--------------------+ | satan.|icmp| 0.20972972972972972| | nmap.|icmp| 0.9960077519379846| | back.| tcp| 1.0| | teardrop.| udp| 0.2468335035750767| | ftp_write.| tcp| 0.875| | loadmodule.| tcp| 0.8355555555555555| | smurf.|icmp| 0.999691383482093| | neptune.| tcp| 0.0433713178056304| | normal.|icmp| 0.5197696466348002| | normal.| tcp| 0.9048007987822215| | ipsweep.| tcp| 0.06840909090909081| | land.| tcp| 0.87| | pod.|icmp| 0.6597348484848484| |buffer_overflow.| tcp| 1.0| | satan.| tcp|7.506891920548513E-4| | imap.| tcp| 0.9166666666666666| | phf.| tcp| 0.9725| | guess_passwd.| tcp| 1.0| | satan.| udp| 0.12149882903981081| | ipsweep.|icmp| 0.9993917106515531| | nmap.| tcp|0.003558994197292...| | perl.| tcp|0.013333333333333334| | normal.| udp| 0.6258512239480872| | portsweep.| tcp|0.002734697799558...| | warezmaster.| tcp| 0.9| | multihop.| tcp| 0.7157142857142856| | nmap.| udp| 0.7534399999999992| | warezclient.| tcp| 0.7354411764705884| | spy.| tcp| 0.185| | rootkit.| tcp| 0.15142857142857144| | portsweep.|icmp| 0.3383333333333334| | rootkit.| udp| 0.6666666666666666| +----------------+----+--------------------+
Travailler avec les requêtes SQL¶
In [21]:
#créer un alias désignant le data frame
df.createOrReplaceTempView("ma_table")
In [22]:
#définir la requête
requete = "SELECT V42,AVG(V34) as moyennes FROM ma_table GROUP BY V42 ORDER BY moyennes DESC"
In [23]:
#lancer la requête
spark.sql(requete).show()
+----------------+------------------+ | V42| moyennes| +----------------+------------------+ | guess_passwd.| 1.0| |buffer_overflow.| 1.0| | back.| 1.0| | smurf.| 0.999691383482093| | phf.| 0.9725| | ipsweep.|0.9304687124429131| | imap.|0.9166666666666666| | warezmaster.| 0.9| | ftp_write.| 0.875| | land.| 0.87| | normal.|0.8448791968592703| | loadmodule.|0.8355555555555555| | warezclient.|0.7354411764705884| | multihop.|0.7157142857142856| | pod.|0.6597348484848484| | nmap.|0.5267357512953367| | rootkit.| 0.306| | teardrop.|0.2468335035750767| | spy.| 0.185| | neptune.|0.0433713178056304| +----------------+------------------+ only showing top 20 rows
Stopper le session¶
In [24]:
#stop
spark.stop()