Spark 窗口查询
相关代码: https://github.com/staticor/WindowQueryInSparkHive.git
什么是窗口查询
示例1
PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
示例2
PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
示例1的Spark链式实现
Window.partitionBy("country").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
示例2的Spark链式实现
Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3)
如果不引入partition by , 那将是全局排序。 用到了partition by 就会针对指定的列划分子窗口。
一般来说会对窗口执行 order by 排序算子。
定义窗口由两个类实现: Window.scala, WindowSpec.scala
rowsBetween
开窗上下边界有这几种情形:
- unbounded preceding 输入数据第一行
- unbounded following 输入数据最后一行
- current row ( 即整数0) 当前行
如果是传入的整数, 正整数或负整数,对应的是间隔偏移量,表示的即为一个滑动窗口; 比如-3 表示当前行之前的3行,+4 表示接下来之后的4行, [-3, 4]
这是一个包括左右端点的窗口,一共有8条数据的滚动窗口。
WindowSpec的核心构造函数是 rowsBetween
源码 (WindowSpec.scala, rowsBetween)
def rowsBetween(start: Long, end: Long): WindowSpec = {
val boundaryStart = start match {
case 0 => CurrentRow
case Long.MinValue => UnboundedPreceding
case x if Int.MinValue <= x && x <= Int.MaxValue => Literal(x.toInt)
case x => throw QueryCompilationErrors.invalidBoundaryStartError(x)
}
val boundaryEnd = end match {
case 0 => CurrentRow
case Long.MaxValue => UnboundedFollowing
case x if Int.MinValue <= x && x <= Int.MaxValue => Literal(x.toInt)
case x => throw QueryCompilationErrors.invalidBoundaryEndError(x)
}
new WindowSpec(
partitionSpec,
orderSpec,
SpecifiedWindowFrame(RowFrame, boundaryStart, boundaryEnd))
}
窗口的属性
- partitionSpec
- orderSpec
- SpecifiedWindowFrame
SpecifiedWindowFrame
是一个case class。
case class SpecifiedWindowFrame(frameType: FrameType, lower: Expression, upper: Expression) extends WindowFrame
partitionSpec 和 orderSpec 是把数据分桶和桶内排序, 第三部分是确定窗口范围。
WindoSpecDefinition
执行计划分析
import spark.implicits._
import org.apache.spark.sql.functions._
val df: DataFrame = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
.toDF("id", "category")
val byCategoryOrderedById =
Window.partitionBy("category")
.orderBy("id")
.rowsBetween(Window.currentRow, 1)
val frame: DataFrame = df.withColumn("sum", sum("id") over byCategoryOrderedById)
输出结果
================ analyzed ====================
Project [id#7, category#8, sum#12L]
+- Project [id#7, category#8, sum#12L, sum#12L]
+- Window [sum(id#7) windowspecdefinition(category#8, id#7 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), 1)) AS sum#12L], [category#8], [id#7 ASC NULLS FIRST]
+- Project [id#7, category#8]
+- Project [_1#2 AS id#7, _2#3 AS category#8]
+- LocalRelation [_1#2, _2#3]
============= optimized Plan=================
Window [sum(id#7) windowspecdefinition(category#8, id#7 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), 1)) AS sum#12L], [category#8], [id#7 ASC NULLS FIRST]
+- LocalRelation [id#7, category#8]
============= executed Plan=================
AdaptiveSparkPlan isFinalPlan=false
+- Window [sum(id#7) windowspecdefinition(category#8, id#7 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), 1)) AS sum#12L], [category#8], [id#7 ASC NULLS FIRST]
+- Sort [category#8 ASC NULLS FIRST, id#7 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(category#8, 200), ENSURE_REQUIREMENTS, [id=#52]
+- LocalTableScan [id#7, category#8]
Exchange hashpartitioning
实际例子
row_number
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
object Example_4_Spark {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[4]")
val spark: SparkSession = SparkSession.builder().config(conf)
.appName("CsvExample")
.master("local").getOrCreate()
import spark.implicits._
val scoreDF = spark.sparkContext.makeRDD(Array( Score("a", 1, 80),
Score("b", 1, 78),
Score("c", 1, 95),
Score("d", 2, 74),
Score("e", 2, 92),
Score("f", 3, 99),
Score("g", 3, 99),
Score("h", 3, 45),
Score("i", 3, 55),
Score("j", 3, 78))).toDF("name","class","score")
scoreDF.createOrReplaceTempView("score")
scoreDF.show()
println("//*************** 求每个班最高成绩学生的信息 ***************/")
println(" /******* 开窗函数的表 ********/")
spark.sql("select name,class,score, rank() over(partition by class order by score desc) rank from score").show()
println(" /******* 计算结果的表 *******")
spark.sql(
"""
|select name, class, score, rk
|from (
| select name, class, score, rank() over(partition by class order by score desc) rk
| from score
|) mid where rk = 1
|""".stripMargin).show()
}
}
case class Score(name: String, group: Int, score: Int)