PySparkにおけるピボット(pivot)とアンピボット(unpivot)のテクニック集

データ分析では、データの構造を変換するためにピボット(pivot)とアンピボット(unpivot)操作が頻繁に使われます。PySparkを使えば、大規模なデータセットでもこれらの操作を効率的に行うことができます。本記事では、PySparkにおけるピボットとアンピボットの詳細なテクニックについて解説し、特にアンピボット時に列名に数値を含む場合の対処法についても説明します。

1. ピボット(pivot)の詳細テクニック

ピボット操作では、データを長形式から広形式に変換します。ここでは、基本的なピボットに加えて、複数の集約関数を使用する方法や、特定の値をピボットする方法について説明します。

1.1 複数の集約関数の使用

ピボット操作時に、複数の集約関数を適用することができます。以下の例では、スコアの合計と平均を計算します。

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, sum

# SparkSessionの作成
spark = SparkSession.builder.appName("PivotExample").getOrCreate()

# サンプルデータの作成
data = [
    ("Alice", "Math", 85),
    ("Alice", "English", 78),
    ("Bob", "Math", 92),
    ("Bob", "English", 81),
    ("Cathy", "Math", 89),
    ("Cathy", "English", 90)
]
columns = ["Name", "Subject", "Score"]

# データフレームの作成
df = spark.createDataFrame(data, columns)

# 複数の集約関数を使用したピボット操作
df_pivot = df.groupBy("Name").pivot("Subject").agg(sum("Score").alias("TotalScore"), avg("Score").alias("AvgScore"))

# ピボット結果の表示
df_pivot.show()
+-----+------------+----------+------------+----------+
| Name|English_TotalScore|English_AvgScore|Math_TotalScore|Math_AvgScore|
+-----+------------+----------+------------+----------+
|Alice|           78|      78.0|           85|      85.0|
|  Bob|           81|      81.0|           92|      92.0|
|Cathy|           90|      90.0|           89|      89.0|
+-----+------------+----------+------------+----------+

1.2 特定の値をピボットする

特定の値に基づいてピボットする場合、pivotメソッドの第二引数に値のリストを渡します。

# 特定の値(MathとEnglish)のみをピボット
df_pivot_specific = df.groupBy("Name").pivot("Subject", ["Math", "English"]).sum("Score")

# ピボット結果の表示
df_pivot_specific.show()
+-----+----+-------+
| Name|Math|English|
+-----+----+-------+
|Alice|  85|     78|
|  Bob|  92|     81|
|Cathy|  89|     90|
+-----+----+-------+

2. アンピボット(unpivot)の詳細テクニック

アンピボット操作では、データを広形式から長形式に変換します。PySparkには直接的なアンピボットメソッドはありませんが、selectExprメソッドを使用して実現できます。

2.1 基本的なアンピボット操作

まず、基本的なアンピボット操作の例を示します。

# サンプルデータの作成(ピボットされたデータ)
data_pivot = [
    ("Alice", 85, 78),
    ("Bob", 92, 81),
    ("Cathy", 89, 90)
]
columns_pivot = ["Name", "Math", "English"]

# データフレームの作成
df_pivoted = spark.createDataFrame(data_pivot, columns_pivot)

# ピボットされたデータフレームの表示
df_pivoted.show()
+-----+----+-------+
| Name|Math|English|
+-----+----+-------+
|Alice|  85|     78|
|  Bob|  92|     81|
|Cathy|  89|     90|
+-----+----+-------+

このデータフレームをアンピボットし、元の形式に戻します。

from pyspark.sql.functions import expr

# アンピボット操作
unpivot_expr = "stack(2, 'Math', Math, 'English', English) as (Subject, Score)"
df_unpivot = df_pivoted.select("Name", expr(unpivot_expr))

# アンピボット結果の表示
df_unpivot.show()
+-----+-------+-----+
| Name|Subject|Score|
+-----+-------+-----+
|Alice|   Math|   85|
|Alice|English|   78|
|  Bob|   Math|   92|
|  Bob|English|   81|
|Cathy|   Math|   89|
|Cathy|English|   90|
+-----+-------+-----+

2.2 列名に数値を含む場合のアンピボット

列名に数値を含む場合、stack関数を使用してアンピボットを行う際に注意が必要です。列名が数値で始まる場合はバッククォート(`)で囲む必要があります。

以下に、列名に数値を含む場合のアンピボット操作の例を示します。

# サンプルデータの作成(数値を含む列名)
data_pivot_num = [
    ("Alice", 85, 78),
    ("Bob", 92, 81),
    ("Cathy", 89, 90)
]
columns_pivot_num = ["Name", "2021_Math", "2021_English"]

# データフレームの作成
df_pivoted_num = spark.createDataFrame(data_pivot_num, columns_pivot_num)

# ピボットされたデータフレームの表示
df_pivoted_num.show()
+-----+----------+------------+
| Name|2021_Math|2021_English|
+-----+----------+------------+
|Alice|        85|          78|
|  Bob|        92|          81|
|Cathy|        89|          90|
+-----+----------+------------+

このデータフレームをアンピボットし、元の形式に戻します。

# アンピボット操作
unpivot_expr_num = "stack(2, '2021_Math', `2021_Math`, '2021_English', `2021_English`) as (Subject, Score)"
df_unpivot_num = df_pivoted_num.select("Name", expr(unpivot_expr_num))

# アンピボット結果の表示
df_unpivot_num.show()
+-----+------------+-----+
| Name|     Subject|Score|
+-----+------------+-----+
|Alice|   2021_Math|   85|
|Alice|2021_English|   78|
|  Bob|   2021_Math|   92|
|  Bob|2021_English|   81|
|Cathy|   2021_Math|   89|
|Cathy|2021_English|   90|
+-----+------------+-----+

3. まとめ

PySparkを使用すると、データのピボットとアンピボット操作を効率的に行うことができます。ピボット操作では、groupBypivotを組み合わせて特定の値を集約できます。アンピボット操作では、selectExprstack関数を使用して広形式から長形式に変換できます。列名に数値が含まれる場合は、バッククォートを使用して正しくアンピボット操作を行うことが重要です。

PySpark公式ドキュメント: DataFrame

よかったらシェアしてね!
目次
閉じる