Das Vorverarbeiten von Daten ist mit der wichtigste Schritt im Prozess des Trainings eines Machine-Learning-Modells. Lernt unser Modell auf unsauberen Daten, werden auch die errechneten Ergebnisse unsauber sein, ganz nach dem Motto: „Ein Schüler ist nur so gut wie sein Lehrer“. Um mit Daten aller Art zu arbeiten wird global vorrangig die Programmiersprache Python eingesetzt [1]. Diese besticht vor allem durch eine riesengroße Auswahl an Bibliotheken, zu denen auch polars gehört.
Was ist Polars?
Nachdem man sich Datenmanipulation ohne pandas nicht mehr vorstellen kann, gibt es mit polars nun einen neuen Stern am DataFrame-Himmel. Das open-source Projekt entstand 2020 und gewann seitdem, dank seiner Performanz und seines einfachen Interfaces, an Popularität. polars ist besonders schnell, da es auf der maschinennahen Programmiersprache Rust aufgebaut ist. Anfragen an die Daten ähneln SQL-Datenbank Kommandos, was diese intuitiv gestaltet.
Zeitreihen
Wenn ein bestimmter Wert über die Spanne einer Zeit aufgenommen wird, nennt man das eine Zeitreihe. Solche Daten können genutzt werden, um Machine-Learning-Modelle zu trainieren, welche Prognosen berechnen, Anomalien feststellen, oder Zeitreihen klassifizieren.
Als minimalen Testdatensatz verwenden wir halb-stündliche Werte der Außentemperatur. Den Datensatz finden Sie in der Datei preprocessing_mit_polars_testdata
Preprocessing von Zeitreihen
Datenanalyse
Bevor man sich Hals über Kopf in das Training des Machine-Learning-Modells stürzt, sollte man sich erst einmal einen Überblick über die Daten verschaffen. Dabei fallen etwaige Fehler in den Daten meist schon auf.
Zuerst müssen die polars Bibliothek und unsere Daten geladen werden. Haben wir die Daten als .csv Datei vorliegen, können wir die Funktion read_csv()
nutzen. Um Nullwerte zu erkennen, setzen wir null_values
auf "null"
.
import polars as pl
input_data_file = "preprocessing_mit_polars_testdaten.csv"
df = pl.read_csv(input_data_file, null_values="null")
shape: (665, 3)
┌──────────────────────────┬─────────────┬───────┐
│ datetime ┆ temperature ┆ id │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ i64 │
╞══════════════════════════╪═════════════╪═══════╡
│ 2021-07-08T15:30:00.000Z ┆ 2149 ┆ 38574 │
│ 2021-07-08T16:00:00.000Z ┆ 2195 ┆ 38574 │
│ 2021-07-08T16:30:00.000Z ┆ 1942 ┆ 38574 │
│ 2021-07-08T17:31:00.000Z ┆ 1993 ┆ 38574 │
│ 2021-07-08T18:31:00.000Z ┆ 1869 ┆ 38574 │
│ … ┆ … ┆ … │
│ 2021-07-25T02:30:00.000Z ┆ 1628 ┆ 38574 │
│ 2021-07-25T03:00:00.000Z ┆ 1635 ┆ 38574 │
│ 2021-07-25T03:30:00.000Z ┆ 1651 ┆ 38574 │
│ 2021-07-25T04:00:00.000Z ┆ 1640 ┆ 38574 │
│ 2021-07-25T04:30:00.000Z ┆ 1660 ┆ 38574 │
└──────────────────────────┴─────────────┴───────┘
Wir erhalten ein DataFrame mit 665 Einträgen und 3 Spalten, wobei das Datum und die Temperatur nicht das richtige Format haben. Es sieht so aus, als hätten wir nur eine id
, also nur Werte von einem Sensor. Dies können wir nachprüfen:
df.describe()
┌────────────┬──────────────────────────┬─────────────┬─────────┐
│ statistic ┆ datetime ┆ temperature ┆ id │
│ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ str ┆ f64 ┆ f64 │
╞════════════╪══════════════════════════╪═════════════╪═════════╡
│ count ┆ 665 ┆ 659.0 ┆ 665.0 │
│ null_count ┆ 0 ┆ 6.0 ┆ 0.0 │
│ mean ┆ null ┆ 1943.775417 ┆ 38574.0 │
│ std ┆ null ┆ 499.790282 ┆ 0.0 │
│ min ┆ 2021-07-08T15:30:00.000Z ┆ 1103.0 ┆ 38574.0 │
│ 25% ┆ null ┆ 1598.0 ┆ 38574.0 │
│ 50% ┆ null ┆ 1845.0 ┆ 38574.0 │
│ 75% ┆ null ┆ 2266.0 ┆ 38574.0 │
│ max ┆ 2021-07-25T04:30:00.000Z ┆ 8526.0 ┆ 38574.0 │
└────────────┴──────────────────────────┴─────────────┴─────────┘
df.describe()
gibt uns statistische Werte über die einzelnen Spalten aus. Hier können wir sehen, dass wir nur eine id=38574
vorliegen haben, also können wir diese Spalte verwerfen.
Datenmanipulation mit select
Wir verwerfen Spalten, indem wir nur die von Interesse auswählen, und zwar die Spalten datetime
und temperature
. Das funktioniert in polars über das von SQL-Queries bekannte SELECT
. Dieses ist eines von polars Expression Befehlen. Expressions geben ein DataFrame zurück und ändern das DataFrame nicht, auf welchem sie aufgerufen werden. Deswegen erstellen wir ein neues DataFrame Objekt, welches die Änderungen speichert.
df = df.select(
pl.col("datetime"),
pl.col("temperature")
)
┌──────────────────────────┬─────────────┐
│ datetime ┆ temperature │
│ --- ┆ --- │
│ str ┆ i64 │
╞══════════════════════════╪═════════════╡
│ 2021-07-08T15:30:00.000Z ┆ 2149 │
│ 2021-07-08T16:00:00.000Z ┆ 2195 │
│ 2021-07-08T16:30:00.000Z ┆ 1942 │
│ 2021-07-08T17:31:00.000Z ┆ 1993 │
│ 2021-07-08T18:31:00.000Z ┆ 1869 │
│ … ┆ … │
│ 2021-07-25T02:30:00.000Z ┆ 1628 │
│ 2021-07-25T03:00:00.000Z ┆ 1635 │
│ 2021-07-25T03:30:00.000Z ┆ 1651 │
│ 2021-07-25T04:00:00.000Z ┆ 1640 │
│ 2021-07-25T04:30:00.000Z ┆ 1660 │
└──────────────────────────┴─────────────┘
Wir haben die Temperatur als Integer ohne Komma vorliegen. Die Werte können wir während des select-Vorgangs in Kommazahlen umwandeln, indem wir die Umrechnungsformel auf die ganze Spalte anwenden.
df = df.select(
pl.col("datetime"),
(pl.col("temperature") / 100)
)
┌──────────────────────────┬─────────────┐
│ datetime ┆ temperature │
│ --- ┆ --- │
│ str ┆ f64 │
╞══════════════════════════╪═════════════╡
│ 2021-07-08T15:30:00.000Z ┆ 21.49 │
│ 2021-07-08T16:00:00.000Z ┆ 21.95 │
│ 2021-07-08T16:30:00.000Z ┆ 19.42 │
│ 2021-07-08T17:31:00.000Z ┆ 19.93 │
│ 2021-07-08T18:31:00.000Z ┆ 18.69 │
│ … ┆ … │
│ 2021-07-25T02:30:00.000Z ┆ 16.28 │
│ 2021-07-25T03:00:00.000Z ┆ 16.35 │
│ 2021-07-25T03:30:00.000Z ┆ 16.51 │
│ 2021-07-25T04:00:00.000Z ┆ 16.4 │
│ 2021-07-25T04:30:00.000Z ┆ 16.6 │
└──────────────────────────┴─────────────┘
Damit sind die Werte auch im richtigen Format, nämlich float64
.
Jetzt muss noch die Zeitspalte von string
in das datetime
Format gebracht werden. Dieses wird es uns ermöglichen, mit den Daten- und Zeitangaben zu arbeiten.
df.select(
pl.col("datetime").str.to_datetime("%Y-%m-%dT%H:%M:%S%.fZ"),
(pl.col("temperature") / 100)
)
┌─────────────────────┬─────────────┐
│ datetime ┆ temperature │
│ --- ┆ --- │
│ datetime[μs] ┆ f64 │
╞═════════════════════╪═════════════╡
│ 2021-07-08 15:30:00 ┆ 21.49 │
│ 2021-07-08 16:00:00 ┆ 21.95 │
│ 2021-07-08 16:30:00 ┆ 19.42 │
│ 2021-07-08 17:31:00 ┆ 19.93 │
│ 2021-07-08 18:31:00 ┆ 18.69 │
│ … ┆ … │
│ 2021-07-25 02:30:00 ┆ 16.28 │
│ 2021-07-25 03:00:00 ┆ 16.35 │
│ 2021-07-25 03:30:00 ┆ 16.51 │
│ 2021-07-25 04:00:00 ┆ 16.4 │
│ 2021-07-25 04:30:00 ┆ 16.6 │
└─────────────────────┴─────────────┘
Wir nehmen den string
aus der Spalte datetime
und wandeln ihn in ein polars datetime
Objekt um. Das ganze verbinden wir mit dem ersten Schritt, bei welchem wir die Temperaturangaben formatiert haben.
Datenreinigung mit filter
Gemessene Daten über Zeit beinhalten fast immer Fehler. Diese können beim Messen, der Übertragung, oder der Speicherung geschehen.
Nachdem wir unser DataFrame in das richtige Format gebracht haben, können wir wieder df.describe()
ausführen.
df.describe()
┌────────────┬────────────────────────────┬─────────────┐
│ statistic ┆ datetime ┆ temperature │
│ --- ┆ --- ┆ --- │
│ str ┆ str ┆ f64 │
╞════════════╪════════════════════════════╪═════════════╡
│ count ┆ 665 ┆ 659.0 │
│ null_count ┆ 0 ┆ 6.0 │
│ mean ┆ 2021-07-17 10:01:08.390977 ┆ 19.437754 │
│ std ┆ null ┆ 4.997903 │
│ min ┆ 2021-07-08 15:30:00 ┆ 11.03 │
│ 25% ┆ 2021-07-13 05:00:00 ┆ 15.98 │
│ 50% ┆ 2021-07-17 14:30:00 ┆ 18.45 │
│ 75% ┆ 2021-07-21 12:30:00 ┆ 22.66 │
│ max ┆ 2021-07-25 04:30:00 ┆ 85.26 │
└────────────┴────────────────────────────┴─────────────┘
Aus den Statistiken lässt sich ablesen, dass unsere Werte in einer Zeitspanne vom 08.07.2021-25.07.2021 aufgenommen wurden, somit über 18 Tage.
Außerdem sehen wir, dass wir 6 Null-Werte für die Temperatur vorliegen haben. Das bedeutet, dass wir fehlenden Werte in unserem Datensatz haben. Zudem weist die Temperatur einen ungewöhnlich hohen Maximalwert von 85.26 auf, welcher wahrscheinlich durch eine der Übertragungslücken entstanden ist.
Zu hohe oder zu niedrige Werte können wir mit der polars Expression filter
löschen, indem wir festlegen, dass sich die Temperaturwerte zwischen 50 und -50 befinden müssen. Diese beiden Konditionen wenden wir auf die Spalte temperature
an:
df = df.filter(
(pl.col("temperature") < 50) & (pl.col("temperature") > -50)
)
df.shape
(657, 2)
df.shape
gibt uns die Dimensionen des DataFrames zurück. Wir können beobachten, dass die Reihenanzahl von 665 auf 657 geschrumpft ist.
Rufen wir noch einmal df.describe()
auf, sehen wir, dass die fehlerhaften Werte und die Nullwerte gefiltert wurden, also die entsprechenden Zeilen entfernt wurden.
df.describe()
┌────────────┬────────────────────────────┬─────────────┐
│ statistic ┆ datetime ┆ temperature │
│ --- ┆ --- ┆ --- │
│ str ┆ str ┆ f64 │
╞════════════╪════════════════════════════╪═════════════╡
│ count ┆ 657 ┆ 657.0 │
│ null_count ┆ 0 ┆ 0.0 │
│ mean ┆ 2021-07-17 08:53:26.210045 ┆ 19.287458 │
│ std ┆ null ┆ 4.095826 │
│ min ┆ 2021-07-08 15:30:00 ┆ 11.03 │
│ 25% ┆ 2021-07-13 04:01:00 ┆ 15.97 │
│ 50% ┆ 2021-07-17 12:30:00 ┆ 18.43 │
│ 75% ┆ 2021-07-21 11:00:00 ┆ 22.63 │
│ max ┆ 2021-07-25 04:30:00 ┆ 28.04 │
└────────────┴────────────────────────────┴─────────────┘
Ändern der Datenfrequenz mit with_columns
und group_by
In unserem Kontext möchten wir die Daten vielleicht von halb-stündlichen in stündliche Daten umwandeln. Dafür benötigen wir die Spalten dates
und hour
. Diese können wir mit with_columns
aus den Daten der Spalte datetime
erstellen:
df.with_columns(
pl.col("datetime").dt.date().alias("date"),
pl.col("datetime").dt.hour().alias("hour")
)
┌─────────────────────┬─────────────┬────────────┬──────┐
│ datetime ┆ temperature ┆ date ┆ hour │
│ --- ┆ --- ┆ --- ┆ --- │
│ datetime[μs] ┆ f64 ┆ date ┆ i8 │
╞═════════════════════╪═════════════╪════════════╪══════╡
│ 2021-07-08 15:30:00 ┆ 21.49 ┆ 2021-07-08 ┆ 15 │
│ 2021-07-08 16:00:00 ┆ 21.95 ┆ 2021-07-08 ┆ 16 │
│ 2021-07-08 16:30:00 ┆ 19.42 ┆ 2021-07-08 ┆ 16 │
│ 2021-07-08 17:31:00 ┆ 19.93 ┆ 2021-07-08 ┆ 17 │
│ 2021-07-08 18:31:00 ┆ 18.69 ┆ 2021-07-08 ┆ 18 │
│ … ┆ … ┆ … ┆ … │
│ 2021-07-25 02:30:00 ┆ 16.28 ┆ 2021-07-25 ┆ 2 │
│ 2021-07-25 03:00:00 ┆ 16.35 ┆ 2021-07-25 ┆ 3 │
│ 2021-07-25 03:30:00 ┆ 16.51 ┆ 2021-07-25 ┆ 3 │
│ 2021-07-25 04:00:00 ┆ 16.4 ┆ 2021-07-25 ┆ 4 │
│ 2021-07-25 04:30:00 ┆ 16.6 ┆ 2021-07-25 ┆ 4 │
└─────────────────────┴─────────────┴────────────┴──────┘
Die Werte wurden meistens im 30-Minuten Takt aufgenommen. Manchmal kommt es aber auch vor, dass eine Stunde zwischen zwei Messungen liegt. Um die Daten zu vereinheitlichen, werden wir sie mit group_by
zu stündlichen Werten gruppieren. Von den gruppierten Temperaturwerten nehmen wir den Durchschnitt. Außerdem sortieren wir die Werte nach dem Gruppieren anhand des Datums und der Stunde.
df.group_by(
[pl.col("date"), pl.col("hour")]
).agg(
pl.col("temperature").mean().round(2)
).sort(
[pl.col("date"), pl.col("hour")]
)
shape: (370, 3)
┌────────────┬──────┬─────────────┐
│ date ┆ hour ┆ temperature │
│ --- ┆ --- ┆ --- │
│ date ┆ i8 ┆ f64 │
╞════════════╪══════╪═════════════╡
│ 2021-07-08 ┆ 15 ┆ 21.49 │
│ 2021-07-08 ┆ 16 ┆ 20.69 │
│ 2021-07-08 ┆ 17 ┆ 19.93 │
│ 2021-07-08 ┆ 18 ┆ 18.69 │
│ 2021-07-08 ┆ 19 ┆ 17.78 │
│ … ┆ … ┆ … │
│ 2021-07-25 ┆ 0 ┆ 17.22 │
│ 2021-07-25 ┆ 1 ┆ 16.99 │
│ 2021-07-25 ┆ 2 ┆ 16.45 │
│ 2021-07-25 ┆ 3 ┆ 16.43 │
│ 2021-07-25 ┆ 4 ┆ 16.5 │
└────────────┴──────┴─────────────┘
Unsere Vermutung war, dass die Daten nicht immer halbstündlich aufgenommen wurden. Wenn wir uns die Anzahl der Werte in jeder Gruppe mit ausgeben lassen, sehen wir, dass es pro Stunde manchmal nur eine und manchmal zwei Aufzeichnungen gibt:
df.group_by(
[pl.col("date"), pl.col("hour")]
).agg(
pl.col("temperature").mean().round(2),
pl.col("temperature").count().alias("count") # zählen der aggregierten Werte
).sort(
[pl.col("date"), pl.col("hour")]
)
shape: (370, 4)
┌────────────┬──────┬─────────────┬───────┐
│ date ┆ hour ┆ temperature ┆ count │
│ --- ┆ --- ┆ --- ┆ --- │
│ date ┆ i8 ┆ f64 ┆ u32 │
╞════════════╪══════╪═════════════╪═══════╡
│ 2021-07-08 ┆ 15 ┆ 21.49 ┆ 1 │
│ 2021-07-08 ┆ 16 ┆ 20.69 ┆ 2 │
│ 2021-07-08 ┆ 17 ┆ 19.93 ┆ 1 │
│ 2021-07-08 ┆ 18 ┆ 18.69 ┆ 1 │
│ 2021-07-08 ┆ 19 ┆ 17.78 ┆ 1 │
│ … ┆ … ┆ … ┆ … │
│ 2021-07-25 ┆ 0 ┆ 17.22 ┆ 1 │
│ 2021-07-25 ┆ 1 ┆ 16.99 ┆ 2 │
│ 2021-07-25 ┆ 2 ┆ 16.45 ┆ 2 │
│ 2021-07-25 ┆ 3 ┆ 16.43 ┆ 2 │
│ 2021-07-25 ┆ 4 ┆ 16.5 ┆ 2 │
└────────────┴──────┴─────────────┴───────┘
Wir können die Daten auch direkt in tägliche Werte umwandeln:
df.group_by(
pl.col("datetime").dt.date().alias("date")
).agg(
pl.col("temperature").mean().round(2)
).sort(
pl.col("date")
)
shape: (18, 2)
┌────────────┬─────────────┐
│ date ┆ temperature │
│ --- ┆ --- │
│ date ┆ f64 │
╞════════════╪═════════════╡
│ 2021-07-08 ┆ 18.63 │
│ 2021-07-09 ┆ 16.07 │
│ 2021-07-10 ┆ 19.02 │
│ 2021-07-11 ┆ 17.84 │
│ 2021-07-12 ┆ 20.05 │
│ … ┆ … │
│ 2021-07-21 ┆ 18.72 │
│ 2021-07-22 ┆ 20.56 │
│ 2021-07-23 ┆ 20.52 │
│ 2021-07-24 ┆ 21.04 │
│ 2021-07-25 ┆ 16.66 │
└────────────┴─────────────┘
Somit erhalten wir unser verarbeitetes polars DataFrame, was für das Training eines Machine-Learning-Modells genutzt werden kann.
Performancevorteil mit Lazy API
Alle bisher gezeigten Codebeispielen nutzen die Eager API von polars. Das bedeutet, der Code wird sofort auf den Daten ausgeführt, und wir erhalten ein DataFrame zurück. Um einen tatsächlichen Performancevorteil zu erreichen, steht uns die Lazy API von polars zur Verfügung. Mit dieser können wir eine Liste von Kommandos erstellen, ein sogenanntes LazyFrame. Dieses wendet die Befehle dann auf einen Schlag auf die Daten an. Dabei berechnet polars automatisch die effizienteste Weise den Plan von Befehlen umzusetzen.
Diese Methode ist nützlich, wenn wir die Daten die wir erhalten genau kennen, und wir die Strategie des Vorverarbeitens schon festgelegt haben. In unserem Fall können wir also folgendes LazyFrame erstellen:
q = (pl.scan_csv(input_data_file, null_values="null")
.select(
pl.col("datetime").str.to_datetime("%Y-%m-%dT%H:%M:%S%.fZ"),
(pl.col("temperature") / 100)).filter(
(pl.col("temperature") < 50) & (pl.col("temperature") > -50)
)
)
Und dieses dann ausführen:
df = q.collect()
Mit der python Bibliothek timeit
können wir den Performancevorteil messen. Dafür definieren wir eine Funktion, welche die Verarbeitungsschritte im eager loading Modus anwendet. Dann messen wir die benötigte Ausführungszeit.
def eager_loading():
df = pl.read_csv(input_data_file, null_values="null")
df = df.select(
pl.col("datetime").str.to_datetime("%Y-%m-%dT%H:%M:%S%.fZ"),
(pl.col("temperature") / 100)
)
df = df.filter(
(pl.col("temperature") < 50) & (pl.col("temperature") > -50)
)
%timeit eager_loading()
399 μs ± 2.51 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Das gleiche machen wir für den lazy loading Modus:
def lazy_loading():
q = (pl.scan_csv(input_data_file, null_values="null")
.select(
pl.col("datetime").str.to_datetime("%Y-%m-%dT%H:%M:%S%.fZ"),
(pl.col("temperature") / 100))
.filter(
(pl.col("temperature") < 50) & (pl.col("temperature") > -50))
)
q.collect()
%timeit lazy_loading()
392 μs ± 4.02 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Auf unserem kleinen Datensatz mit nur rund 600 Datenpunkten ist kein ausschlaggebender Performance Vorteil zu vermerken. Wenn wir allerdings nur das collect()
Statement messen, erhalten wir folgende Zeit:
%timeit q.collect()
255 μs ± 2.47 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
Somit sind wir, falls wir das LazyFrame schon definiert haben, auch auf unserem kleinen Datensatz um fast 150 μs schneller. Diese Möglichkeit haben wir mit der Eager API nicht.
Fazit
Wir haben gesehen, wie man mit polars und seinen Expressions select
, with_columns
, filter
und group_by
, erfolgreich Zeitreihen vorverarbeitet. LazyFrames haben wir auch kennengelernt, und können diese für effizienteres Ausführen nutzen.
Meiner Meinung nach liegt der Reiz von polars darin, dass die API besonders benutzerfreundlich gestaltet ist, sowie dem Kombinieren von Expressions. Dieses bietet die Möglichkeit auf einen Schlag die Daten auf viele verschiedene Arten zu manipulieren.
Dies war nur ein kleiner Einblick in die Welt von polars, denn es gibt noch mehr spannende polars Funktionen zu entdecken. Dennoch hoffe ich, er hat Ihnen gefallen, und Sie haben jetzt auch Lust, Daten mit Hilfe von polars aufzuräumen!
- Datenanalyse, Polars, Preprocessing, Python, Zeitreihen