時系列データは、時間に基づいてデータを分析するために重要です。PySparkを使用すると、大規模な時系列データを効率的に処理および分析することができます。この記事では、PySparkを使用して時系列データを扱う基本的なテクニックについて解説します。
1. PySparkのセットアップ
まず、PySparkの環境をセットアップします。以下のコマンドでPySparkをインストールします。
pip install pyspark
次に、必要なライブラリをインポートし、SparkSessionを作成します。
from pyspark.sql import SparkSession
# SparkSessionの作成
spark = SparkSession.builder.appName("TimeSeriesAnalysis").getOrCreate()
2. サンプルデータの作成
以下のコードを使用して、サンプルの時系列データを作成します。このデータは、特定の日付における売上額を含んでいます。
from pyspark.sql import Row
from pyspark.sql.functions import col, to_date
# サンプルデータの作成
data = [
Row(date="2023-01-01", sales=200),
Row(date="2023-01-02", sales=220),
Row(date="2023-01-03", sales=250),
Row(date="2023-01-04", sales=210),
Row(date="2023-01-05", sales=230),
Row(date="2023-01-06", sales=260),
Row(date="2023-01-07", sales=280)
]
# データフレームの作成
df = spark.createDataFrame(data)
df = df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))
# データフレームの表示
df.show()
+----------+-----+
| date|sales|
+----------+-----+
|2023-01-01| 200|
|2023-01-02| 220|
|2023-01-03| 250|
|2023-01-04| 210|
|2023-01-05| 230|
|2023-01-06| 260|
|2023-01-07| 280|
+----------+-----+
3. 基本的な時系列データの操作
3.1 データの並べ替え
時系列データを操作する際には、データを時間順に並べ替えることが重要です。
# 日付でデータを並べ替え
df_sorted = df.orderBy("date")
df_sorted.show()
3.2 移動平均の計算
移動平均は、時系列データの傾向を把握するための基本的な方法です。
from pyspark.sql.window import Window
from pyspark.sql.functions import avg
# ウィンドウの定義
window_spec = Window.orderBy("date").rowsBetween(-2, 0)
# 移動平均の計算
df_with_moving_avg = df_sorted.withColumn("moving_avg", avg("sales").over(window_spec))
df_with_moving_avg.show()
+----------+-----+-----------+
| date|sales|moving_avg |
+----------+-----+-----------+
|2023-01-01| 200| 200.0|
|2023-01-02| 220| 210.0|
|2023-01-03| 250| 223.3|
|2023-01-04| 210| 226.7|
|2023-01-05| 230| 230.0|
|2023-01-06| 260| 233.3|
|2023-01-07| 280| 256.7|
+----------+-----+-----------+
4. 時系列データの差分計算
データの変化を分析するために、前日との差分を計算します。
from pyspark.sql.functions import lag
# 前日の売上を取得
window_spec = Window.orderBy("date")
df_with_lag = df_sorted.withColumn("previous_sales", lag("sales", 1).over(window_spec))
# 差分の計算
df_with_diff = df_with_lag.withColumn("sales_diff", col("sales") - col("previous_sales"))
df_with_diff.show()
+----------+-----+--------------+----------+
| date|sales|previous_sales|sales_diff|
+----------+-----+--------------+----------+
|2023-01-01| 200| null| null|
|2023-01-02| 220| 200| 20|
|2023-01-03| 250| 220| 30|
|2023-01-04| 210| 250| -40|
|2023-01-05| 230| 210| 20|
|2023-01-06| 260| 230| 30|
|2023-01-07| 280| 260| 20|
+----------+-----+--------------+----------+
5. 時間の再サンプリング
時系列データを異なる時間単位に再サンプリングすることも重要です。以下の例では、日次データを月次データに再サンプリングします。
from pyspark.sql.functions import month
# 月ごとに売上を集計
df_monthly = df_sorted.withColumn("month", month("date"))
df_monthly_grouped = df_monthly.groupBy("month").agg({"sales": "sum"}).withColumnRenamed("sum(sales)", "monthly_sales")
df_monthly_grouped.show()
+-----+-------------+
|month|monthly_sales|
+-----+-------------+
| 1| 1650|
+-----+-------------+
6. 時間の特定の部分を抽出
時系列データを操作する際に、日付や時間の特定の部分(年、月、日、曜日など)を抽出することがよくあります。
from pyspark.sql.functions import year, month, dayofmonth, dayofweek
# 年、月、日、曜日の抽出
df_extracted = df_sorted.withColumn("year", year("date")) \
.withColumn("month", month("date")) \
.withColumn("day", dayofmonth("date")) \
.withColumn("weekday", dayofweek("date"))
df_extracted.show()
+----------+-----+----+-----+---+-------+
| date|sales|year|month|day|weekday|
+----------+-----+----+-----+---+-------+
|2023-01-01| 200|2023| 1| 1| 1|
|2023-01-02| 220|2023| 1| 2| 2|
|2023-01-03| 250|2023| 1| 3| 3|
|2023-01-04| 210|2023| 1| 4| 4|
|2023-01-05| 230|2023| 1| 5| 5|
|2023-01-06| 260|2023| 1| 6| 6|
|2023-01-07| 280|2023| 1| 7| 7|
+----------+-----+----+-----+---+-------+
7. 時系列データの可視化
時系列データの可視化は、データの傾向やパターンを視覚的に把握するのに役立ちます。Matplotlibを使用して簡単な可視化を行います。
7.1 Matplotlibのインストール
Matplotlibをインストールしていない場合は、以下のコマンドでインストールします。
pip install matplotlib
7.2 可視化の実装
以下のコードを使用して、売上データの時系列プロットを作成します。
import matplotlib.pyplot as plt
# データをPandasデータフレームに変換
df_pandas = df_sorted.toPandas()
# プロットの作成
plt.figure(figsize=(10, 6))
plt.plot(df_pandas['date'], df_pandas['sales'], marker='o')
plt.title('Daily Sales')
plt.xlabel('Date')
plt.ylabel('Sales')
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
まとめ
この記事では、PySparkを使用して時系列データを扱う基本的なテクニックについて解説しました。データの並べ替え、移動平均の計算、差分計算、再サンプリング、時間の特定部分の抽出、そしてデータの可視化といった基本的な操作方法を学びました。これらのテクニックを活用することで、時系列データの分析を効率的に行うことができます。