Version¶

In [1]:
#pyspark
import pyspark
pyspark.__version__
Out[1]:
'3.5.4'

Chargement et inspection des données¶

In [2]:
#changer le répertoire courant
import os
os.chdir("C:/Users/ricco/Desktop/demo")
In [3]:
#création d'une session
from pyspark.sql import SparkSession
#configuré localement avec 4 coeurs
#et max de mémoire utilisée 1 GB
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.executor.memory", "1g") \
    .config("spark.executor.cores", "4") \
    .getOrCreate()

#type de l'objet
print(type(spark))
<class 'pyspark.sql.session.SparkSession'>
In [4]:
#chargement du fichier + deviner le type des variables
df = spark.read.csv("kddcup99twice.txt",header=True,inferSchema=True)

#type de l'objet
print(type(df))
<class 'pyspark.sql.dataframe.DataFrame'>
In [5]:
#dimensions
print(f"Lignes : {df.count()}")
print(f"Colonnes : {len(df.columns)}")
Lignes : 9796862
Colonnes : 42
In [6]:
#premières lignes
df.show(5)
+---+---+----+---+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-------+
| V1| V2|  V3| V4| V5|   V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|V20|V21|V22|V23|V24|V25|V26|V27|V28|V29|V30|V31|V32|V33|V34|V35|V36|V37|V38|V39|V40|V41|    V42|
+---+---+----+---+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-------+
|  0|tcp|http| SF|215|45076|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  1|0.0|0.0|0.0|0.0|1.0|0.0|0.0|  0|  0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|normal.|
|  0|tcp|http| SF|215|45076|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  1|0.0|0.0|0.0|0.0|1.0|0.0|0.0|  0|  0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|normal.|
|  0|tcp|http| SF|162| 4528|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  2|  2|0.0|0.0|0.0|0.0|1.0|0.0|0.0|  1|  1|1.0|0.0|1.0|0.0|0.0|0.0|0.0|0.0|normal.|
|  0|tcp|http| SF|162| 4528|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  2|  2|0.0|0.0|0.0|0.0|1.0|0.0|0.0|  1|  1|1.0|0.0|1.0|0.0|0.0|0.0|0.0|0.0|normal.|
|  0|tcp|http| SF|236| 1228|  0|  0|  0|  0|  0|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  1|0.0|0.0|0.0|0.0|1.0|0.0|0.0|  2|  2|1.0|0.0|0.5|0.0|0.0|0.0|0.0|0.0|normal.|
+---+---+----+---+---+-----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-------+
only showing top 5 rows

In [7]:
#structure de la table
df.printSchema()
root
 |-- V1: integer (nullable = true)
 |-- V2: string (nullable = true)
 |-- V3: string (nullable = true)
 |-- V4: string (nullable = true)
 |-- V5: integer (nullable = true)
 |-- V6: integer (nullable = true)
 |-- V7: integer (nullable = true)
 |-- V8: integer (nullable = true)
 |-- V9: integer (nullable = true)
 |-- V10: integer (nullable = true)
 |-- V11: integer (nullable = true)
 |-- V12: integer (nullable = true)
 |-- V13: integer (nullable = true)
 |-- V14: integer (nullable = true)
 |-- V15: integer (nullable = true)
 |-- V16: integer (nullable = true)
 |-- V17: integer (nullable = true)
 |-- V18: integer (nullable = true)
 |-- V19: integer (nullable = true)
 |-- V20: integer (nullable = true)
 |-- V21: integer (nullable = true)
 |-- V22: integer (nullable = true)
 |-- V23: integer (nullable = true)
 |-- V24: integer (nullable = true)
 |-- V25: double (nullable = true)
 |-- V26: double (nullable = true)
 |-- V27: double (nullable = true)
 |-- V28: double (nullable = true)
 |-- V29: double (nullable = true)
 |-- V30: double (nullable = true)
 |-- V31: double (nullable = true)
 |-- V32: integer (nullable = true)
 |-- V33: integer (nullable = true)
 |-- V34: double (nullable = true)
 |-- V35: double (nullable = true)
 |-- V36: double (nullable = true)
 |-- V37: double (nullable = true)
 |-- V38: double (nullable = true)
 |-- V39: double (nullable = true)
 |-- V40: double (nullable = true)
 |-- V41: double (nullable = true)
 |-- V42: string (nullable = true)

Filtrage¶

In [8]:
#commande filter -- très rapide parce que pas calculée en vrai
#juste préparée
res = df.filter((df.V42 == "normal.") & (df.V2 == "udp"))
In [9]:
#ici oui calculée réellement
print(res.count())
382696

Moyennes pour variables numériques¶

In [10]:
#types des variables
df.dtypes
Out[10]:
[('V1', 'int'),
 ('V2', 'string'),
 ('V3', 'string'),
 ('V4', 'string'),
 ('V5', 'int'),
 ('V6', 'int'),
 ('V7', 'int'),
 ('V8', 'int'),
 ('V9', 'int'),
 ('V10', 'int'),
 ('V11', 'int'),
 ('V12', 'int'),
 ('V13', 'int'),
 ('V14', 'int'),
 ('V15', 'int'),
 ('V16', 'int'),
 ('V17', 'int'),
 ('V18', 'int'),
 ('V19', 'int'),
 ('V20', 'int'),
 ('V21', 'int'),
 ('V22', 'int'),
 ('V23', 'int'),
 ('V24', 'int'),
 ('V25', 'double'),
 ('V26', 'double'),
 ('V27', 'double'),
 ('V28', 'double'),
 ('V29', 'double'),
 ('V30', 'double'),
 ('V31', 'double'),
 ('V32', 'int'),
 ('V33', 'int'),
 ('V34', 'double'),
 ('V35', 'double'),
 ('V36', 'double'),
 ('V37', 'double'),
 ('V38', 'double'),
 ('V39', 'double'),
 ('V40', 'double'),
 ('V41', 'double'),
 ('V42', 'string')]
In [11]:
#noms des variables != "string"
col_numeriques = [name for name,type in df.dtypes if (type != 'string')]
print(col_numeriques)
['V1', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'V29', 'V30', 'V31', 'V32', 'V33', 'V34', 'V35', 'V36', 'V37', 'V38', 'V39', 'V40', 'V41']
In [12]:
# récupérer le data frame
# * pour décompresser la liste
# sinon, il aurait fallu faire df.select('V1','V5','V6'...)
dfNum = df.select(*col_numeriques)

#moyennes
print(dfNum.columns)
['V1', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10', 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20', 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'V29', 'V30', 'V31', 'V32', 'V33', 'V34', 'V35', 'V36', 'V37', 'V38', 'V39', 'V40', 'V41']
In [13]:
#fonctions
import pyspark.sql.functions as F

#moyennes des colonnes
moyennes = dfNum.agg(*(F.avg(col).alias(col) for col in col_numeriques))

#afficher
moyennes.show()

|               V1|                V5|                V6|                  V7|                  V8|                  V9|                 V10|                 V11|                V12|                 V13|                V14|                 V15|                 V16|                 V17|                 V18|                 V19|V20|                 V21|                 V22|               V23|               V24|               V25|                V26|                V27|                V28|               V29|                 V30|                 V31|               V32|               V33|               V34|                V35|               V36|                 V37|                V38|               V39|                 V40|                V41|

|48.34243046395876|1834.6211752293746|1093.6228137132073|5.716116037972159E-6|6.487791703098401E-4|7.961733052889793E-6|0.012437656057623349|3.205107921291532E-5|0.14352901980246327|0.008088304193730605|6.81850984529536E-5|3.674646024410674E-5|0.012934958152926926|0.001188747988896...|7.430950849363806E-5|0.001021143300783455|0.0|4.082940027122971E-7|8.351653825480036E-4|334.97344027097654|295.26708613431526|0.1779702806878345|0.17803695305701234|0.05766509112815944|0.05773010174074143|0.7898841751576265|0.021179606286207367|0.028260804326932985|232.98108271811932|189.21424492863122|0.7537132359322468|0.03071110524989229|0.6050520115522975|0.006464106568015736|0.17809114796148975|0.1778858618198701|0.057927803821273834|0.05765941380006276|


Filtrage + Moyennes¶

In [14]:
# on peut enchaîner filtrage et sélection
dfFilNum = df.filter((df.V42 == "normal.") & (df.V2 == "udp")).select(*col_numeriques)

#et donc
dfFilNum.agg(*(F.avg(col).alias(col) for col in col_numeriques)).show()
+-----------------+-----------------+-----------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----------------+------------------+--------------------+---+--------------------+---+------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+--------------------+--------------------+---+--------------------+---+
|               V1|               V5|               V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|V20|V21|V22|             V23|               V24|                 V25|V26|                 V27|V28|               V29|                V30|                V31|               V32|               V33|               V34|                V35|               V36|                 V37|                 V38|V39|                 V40|V41|
+-----------------+-----------------+-----------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----------------+------------------+--------------------+---+--------------------+---+------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+--------------------+--------------------+---+--------------------+---+
|1061.262338775425|98.31598448899388|89.40573196479713|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|10.4667934862136|14.623607249618496|4.123377302088340...|0.0|2.059075610928779E-5|0.0|0.9508702468800143|0.05335164203441275|0.08134430461777149|235.52450509020215|150.32263206304742|0.6258512239480872|0.22505325375755567|0.3214867153041238|0.001413811484833...|1.590296214227477E-4|0.0|0.001447258398310...|0.0|
+-----------------+-----------------+-----------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----------------+------------------+--------------------+---+--------------------+---+------------------+-------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+--------------------+--------------------+---+--------------------+---+

Tri de manière décroissante¶

In [15]:
#orderby + fonction (alias F ci-dessus) pour décroissante
#affichage des 5 premières lignes
df.orderBy(F.desc('V6')).show(5)
+-----+---+-------+----+---+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+---+---+---+---+---+---+----------+
|   V1| V2|     V3|  V4| V5|        V6| V7| V8| V9|V10|V11|V12|V13|V14|V15|V16|V17|V18|V19|V20|V21|V22|V23|V24|V25|V26|V27|V28|V29|V30|V31|V32|V33| V34| V35|V36|V37|V38|V39|V40|V41|       V42|
+-----+---+-------+----+---+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+---+---+---+---+---+---+----------+
|10999|tcp|  other|RSTR|  0|1309937401|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  1|0.0|0.0|1.0|1.0|1.0|0.0|0.0|255|  1| 0.0|0.65|1.0|0.0|0.0|0.0|1.0|1.0|portsweep.|
|10999|tcp|  other|RSTR|  0|1309937401|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  1|0.0|0.0|1.0|1.0|1.0|0.0|0.0|255|  1| 0.0|0.65|1.0|0.0|0.0|0.0|1.0|1.0|portsweep.|
|39930|tcp|private|RSTR|  0| 400291060|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  2|  2|0.0|0.0|1.0|1.0|1.0|0.0|0.0|255|  2|0.01| 0.5|1.0|0.0|0.0|0.0|1.0|1.0|portsweep.|
|39930|tcp|private|RSTR|  0| 400291060|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  2|  2|0.0|0.0|1.0|1.0|1.0|0.0|0.0|255|  2|0.01| 0.5|1.0|0.0|0.0|0.0|1.0|1.0|portsweep.|
|39869|tcp|private|RSTR|  0| 400291060|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  1|  1|0.0|0.0|1.0|1.0|1.0|0.0|0.0|255|  1| 0.0| 0.5|1.0|0.0|0.0|0.0|1.0|1.0|portsweep.|
+-----+---+-------+----+---+----------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+---+---+---+---+---+---+----------+
only showing top 5 rows

Comptage des valeurs¶

In [17]:
#selon V42 - très rapide parce que "lazy"
res = df.groupby('V42').count()
In [18]:
#traitement quand demande affichage
res.orderBy(F.desc('count')).show()
+----------------+-------+
|             V42|  count|
+----------------+-------+
|          smurf.|5615772|
|        neptune.|2144034|
|         normal.|1945562|
|          satan.|  31784|
|        ipsweep.|  24962|
|      portsweep.|  20826|
|           nmap.|   4632|
|           back.|   4406|
|    warezclient.|   2040|
|       teardrop.|   1958|
|            pod.|    528|
|   guess_passwd.|    106|
|buffer_overflow.|     60|
|           land.|     42|
|    warezmaster.|     40|
|           imap.|     24|
|        rootkit.|     20|
|     loadmodule.|     18|
|      ftp_write.|     16|
|       multihop.|     14|
+----------------+-------+
only showing top 20 rows

Moyennes conditionnelles (1 critère)¶

In [19]:
#moyennes de V34 selon V42
#paramétrer show pour montrer toutes les lignes
res = df.groupby('V42').avg('V34').orderBy('avg(V34)')
res.show(res.count())
+----------------+--------------------+
|             V42|            avg(V34)|
+----------------+--------------------+
|      portsweep.|0.002928070680879...|
|           perl.|0.013333333333333334|
|          satan.|0.014214699219732984|
|        neptune.|  0.0433713178056304|
|            spy.|               0.185|
|       teardrop.|  0.2468335035750767|
|        rootkit.|               0.306|
|           nmap.|  0.5267357512953367|
|            pod.|  0.6597348484848484|
|       multihop.|  0.7157142857142856|
|    warezclient.|  0.7354411764705884|
|     loadmodule.|  0.8355555555555555|
|         normal.|  0.8448791968592703|
|           land.|                0.87|
|      ftp_write.|               0.875|
|    warezmaster.|                 0.9|
|           imap.|  0.9166666666666666|
|        ipsweep.|  0.9304687124429131|
|            phf.|              0.9725|
|          smurf.|   0.999691383482093|
|   guess_passwd.|                 1.0|
|buffer_overflow.|                 1.0|
|           back.|                 1.0|
+----------------+--------------------+

Moyennes conditionnelles (2 critères)¶

In [20]:
#moyennes de V34 en fonction de (V2, V42)
res = df.groupby('V42','V2').avg('V34')
res.show(res.count())
+----------------+----+--------------------+
|             V42|  V2|            avg(V34)|
+----------------+----+--------------------+
|          satan.|icmp| 0.20972972972972972|
|           nmap.|icmp|  0.9960077519379846|
|           back.| tcp|                 1.0|
|       teardrop.| udp|  0.2468335035750767|
|      ftp_write.| tcp|               0.875|
|     loadmodule.| tcp|  0.8355555555555555|
|          smurf.|icmp|   0.999691383482093|
|        neptune.| tcp|  0.0433713178056304|
|         normal.|icmp|  0.5197696466348002|
|         normal.| tcp|  0.9048007987822215|
|        ipsweep.| tcp| 0.06840909090909081|
|           land.| tcp|                0.87|
|            pod.|icmp|  0.6597348484848484|
|buffer_overflow.| tcp|                 1.0|
|          satan.| tcp|7.506891920548513E-4|
|           imap.| tcp|  0.9166666666666666|
|            phf.| tcp|              0.9725|
|   guess_passwd.| tcp|                 1.0|
|          satan.| udp| 0.12149882903981081|
|        ipsweep.|icmp|  0.9993917106515531|
|           nmap.| tcp|0.003558994197292...|
|           perl.| tcp|0.013333333333333334|
|         normal.| udp|  0.6258512239480872|
|      portsweep.| tcp|0.002734697799558...|
|    warezmaster.| tcp|                 0.9|
|       multihop.| tcp|  0.7157142857142856|
|           nmap.| udp|  0.7534399999999992|
|    warezclient.| tcp|  0.7354411764705884|
|            spy.| tcp|               0.185|
|        rootkit.| tcp| 0.15142857142857144|
|      portsweep.|icmp|  0.3383333333333334|
|        rootkit.| udp|  0.6666666666666666|
+----------------+----+--------------------+

Travailler avec les requêtes SQL¶

In [21]:
#créer un alias désignant le data frame
df.createOrReplaceTempView("ma_table")
In [22]:
#définir la requête
requete = "SELECT V42,AVG(V34) as moyennes FROM ma_table GROUP BY V42 ORDER BY moyennes DESC"
In [23]:
#lancer la requête
spark.sql(requete).show()
+----------------+------------------+
|             V42|          moyennes|
+----------------+------------------+
|   guess_passwd.|               1.0|
|buffer_overflow.|               1.0|
|           back.|               1.0|
|          smurf.| 0.999691383482093|
|            phf.|            0.9725|
|        ipsweep.|0.9304687124429131|
|           imap.|0.9166666666666666|
|    warezmaster.|               0.9|
|      ftp_write.|             0.875|
|           land.|              0.87|
|         normal.|0.8448791968592703|
|     loadmodule.|0.8355555555555555|
|    warezclient.|0.7354411764705884|
|       multihop.|0.7157142857142856|
|            pod.|0.6597348484848484|
|           nmap.|0.5267357512953367|
|        rootkit.|             0.306|
|       teardrop.|0.2468335035750767|
|            spy.|             0.185|
|        neptune.|0.0433713178056304|
+----------------+------------------+
only showing top 20 rows

Stopper le session¶

In [24]:
#stop
spark.stop()