PySparkにおける時系列データを扱うテクニック

時系列データは、時間に基づいてデータを分析するために重要です。PySparkを使用すると、大規模な時系列データを効率的に処理および分析することができます。この記事では、PySparkを使用して時系列データを扱う基本的なテクニックについて解説します。

1. PySparkのセットアップ

まず、PySparkの環境をセットアップします。以下のコマンドでPySparkをインストールします。

pip install pyspark

次に、必要なライブラリをインポートし、SparkSessionを作成します。

from pyspark.sql import SparkSession

# SparkSessionの作成
spark = SparkSession.builder.appName("TimeSeriesAnalysis").getOrCreate()

2. サンプルデータの作成

以下のコードを使用して、サンプルの時系列データを作成します。このデータは、特定の日付における売上額を含んでいます。

from pyspark.sql import Row
from pyspark.sql.functions import col, to_date

# サンプルデータの作成
data = [
    Row(date="2023-01-01", sales=200),
    Row(date="2023-01-02", sales=220),
    Row(date="2023-01-03", sales=250),
    Row(date="2023-01-04", sales=210),
    Row(date="2023-01-05", sales=230),
    Row(date="2023-01-06", sales=260),
    Row(date="2023-01-07", sales=280)
]

# データフレームの作成
df = spark.createDataFrame(data)
df = df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# データフレームの表示
df.show()
+----------+-----+
|      date|sales|
+----------+-----+
|2023-01-01|  200|
|2023-01-02|  220|
|2023-01-03|  250|
|2023-01-04|  210|
|2023-01-05|  230|
|2023-01-06|  260|
|2023-01-07|  280|
+----------+-----+

3. 基本的な時系列データの操作

3.1 データの並べ替え

時系列データを操作する際には、データを時間順に並べ替えることが重要です。

# 日付でデータを並べ替え
df_sorted = df.orderBy("date")
df_sorted.show()

3.2 移動平均の計算

移動平均は、時系列データの傾向を把握するための基本的な方法です。

from pyspark.sql.window import Window
from pyspark.sql.functions import avg

# ウィンドウの定義
window_spec = Window.orderBy("date").rowsBetween(-2, 0)

# 移動平均の計算
df_with_moving_avg = df_sorted.withColumn("moving_avg", avg("sales").over(window_spec))
df_with_moving_avg.show()
+----------+-----+-----------+
|      date|sales|moving_avg |
+----------+-----+-----------+
|2023-01-01|  200|      200.0|
|2023-01-02|  220|      210.0|
|2023-01-03|  250|      223.3|
|2023-01-04|  210|      226.7|
|2023-01-05|  230|      230.0|
|2023-01-06|  260|      233.3|
|2023-01-07|  280|      256.7|
+----------+-----+-----------+

4. 時系列データの差分計算

データの変化を分析するために、前日との差分を計算します。

from pyspark.sql.functions import lag

# 前日の売上を取得
window_spec = Window.orderBy("date")
df_with_lag = df_sorted.withColumn("previous_sales", lag("sales", 1).over(window_spec))

# 差分の計算
df_with_diff = df_with_lag.withColumn("sales_diff", col("sales") - col("previous_sales"))
df_with_diff.show()
+----------+-----+--------------+----------+
|      date|sales|previous_sales|sales_diff|
+----------+-----+--------------+----------+
|2023-01-01|  200|          null|      null|
|2023-01-02|  220|           200|        20|
|2023-01-03|  250|           220|        30|
|2023-01-04|  210|           250|       -40|
|2023-01-05|  230|           210|        20|
|2023-01-06|  260|           230|        30|
|2023-01-07|  280|           260|        20|
+----------+-----+--------------+----------+

5. 時間の再サンプリング

時系列データを異なる時間単位に再サンプリングすることも重要です。以下の例では、日次データを月次データに再サンプリングします。

from pyspark.sql.functions import month

# 月ごとに売上を集計
df_monthly = df_sorted.withColumn("month", month("date"))
df_monthly_grouped = df_monthly.groupBy("month").agg({"sales": "sum"}).withColumnRenamed("sum(sales)", "monthly_sales")
df_monthly_grouped.show()
+-----+-------------+
|month|monthly_sales|
+-----+-------------+
|    1|         1650|
+-----+-------------+

6. 時間の特定の部分を抽出

時系列データを操作する際に、日付や時間の特定の部分(年、月、日、曜日など)を抽出することがよくあります。

from pyspark.sql.functions import year, month, dayofmonth, dayofweek

# 年、月、日、曜日の抽出
df_extracted = df_sorted.withColumn("year", year("date")) \
                        .withColumn("month", month("date")) \
                        .withColumn("day", dayofmonth("date")) \
                        .withColumn("weekday", dayofweek("date"))
df_extracted.show()
+----------+-----+----+-----+---+-------+
|      date|sales|year|month|day|weekday|
+----------+-----+----+-----+---+-------+
|2023-01-01|  200|2023|    1|  1|      1|
|2023-01-02|  220|2023|    1|  2|      2|
|2023-01-03|  250|2023|    1|  3|      3|
|2023-01-04|  210|2023|    1|  4|      4|
|2023-01-05|  230|2023|    1|  5|      5|
|2023-01-06|  260|2023|    1|  6|      6|
|2023-01-07|  280|2023|    1|  7|      7|
+----------+-----+----+-----+---+-------+

7. 時系列データの可視化

時系列データの可視化は、データの傾向やパターンを視覚的に把握するのに役立ちます。Matplotlibを使用して簡単な可視化を行います。

7.1 Matplotlibのインストール

Matplotlibをインストールしていない場合は、以下のコマンドでインストールします。

pip install matplotlib

7.2 可視化の実装

以下のコードを使用して、売上データの時系列プロットを作成します。

import matplotlib.pyplot as plt

# データをPandasデータフレームに変換
df_pandas = df_sorted.toPandas()

# プロットの作成
plt.figure(figsize=(10, 6))
plt.plot(df_pandas['date'], df_pandas['sales'], marker='o')
plt.title('Daily Sales')
plt.xlabel('Date')
plt.ylabel('Sales')
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

まとめ

この記事では、PySparkを使用して時系列データを扱う基本的なテクニックについて解説しました。データの並べ替え、移動平均の計算、差分計算、再サンプリング、時間の特定部分の抽出、そしてデータの可視化といった基本的な操作方法を学びました。これらのテクニックを活用することで、時系列データの分析を効率的に行うことができます。

よかったらシェアしてね!
目次
閉じる