相关代码: https://github.com/staticor/WindowQueryInSparkHive.git

什么是窗口查询

示例1
 PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW

 示例2
 PARTITION BY country ORDER BY date ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING
示例1的Spark链式实现
Window.partitionBy("country").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)


示例2的Spark链式实现
Window.partitionBy("country").orderBy("date").rowsBetween(-3, 3)

如果不引入partition by , 那将是全局排序。 用到了partition by 就会针对指定的列划分子窗口。
一般来说会对窗口执行 order by 排序算子。

定义窗口由两个类实现: Window.scala, WindowSpec.scala

rowsBetween

开窗上下边界有这几种情形:

  • unbounded preceding 输入数据第一行
  • unbounded following 输入数据最后一行
  • current row ( 即整数0) 当前行

如果是传入的整数, 正整数或负整数,对应的是间隔偏移量,表示的即为一个滑动窗口; 比如-3 表示当前行之前的3行,+4 表示接下来之后的4行, [-3, 4] 这是一个包括左右端点的窗口,一共有8条数据的滚动窗口。

WindowSpec的核心构造函数是 rowsBetween

源码 (WindowSpec.scala, rowsBetween)
def rowsBetween(start: Long, end: Long): WindowSpec = {
    val boundaryStart = start match {
      case 0 => CurrentRow
      case Long.MinValue => UnboundedPreceding
      case x if Int.MinValue <= x && x <= Int.MaxValue => Literal(x.toInt)
      case x => throw QueryCompilationErrors.invalidBoundaryStartError(x)
    }

    val boundaryEnd = end match {
      case 0 => CurrentRow
      case Long.MaxValue => UnboundedFollowing
      case x if Int.MinValue <= x && x <= Int.MaxValue => Literal(x.toInt)
      case x => throw QueryCompilationErrors.invalidBoundaryEndError(x)
    }

    new WindowSpec(
      partitionSpec,
      orderSpec,
      SpecifiedWindowFrame(RowFrame, boundaryStart, boundaryEnd))
  }
窗口的属性
  • partitionSpec
  • orderSpec
  • SpecifiedWindowFrame

SpecifiedWindowFrame 是一个case class。

case class SpecifiedWindowFrame(frameType: FrameType, lower: Expression, upper: Expression)   extends WindowFrame

partitionSpec 和 orderSpec 是把数据分桶和桶内排序, 第三部分是确定窗口范围。

WindoSpecDefinition

执行计划分析

 import spark.implicits._
    import org.apache.spark.sql.functions._

    val df: DataFrame = Seq((1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b"))
      .toDF("id", "category")

    val byCategoryOrderedById =
      Window.partitionBy("category")
        .orderBy("id")
        .rowsBetween(Window.currentRow, 1)

    val frame: DataFrame = df.withColumn("sum", sum("id") over byCategoryOrderedById)

输出结果


================  analyzed ====================
Project [id#7, category#8, sum#12L]
+- Project [id#7, category#8, sum#12L, sum#12L]
   +- Window [sum(id#7) windowspecdefinition(category#8, id#7 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), 1)) AS sum#12L], [category#8], [id#7 ASC NULLS FIRST]
      +- Project [id#7, category#8]
         +- Project [_1#2 AS id#7, _2#3 AS category#8]
            +- LocalRelation [_1#2, _2#3]

============= optimized Plan=================
Window [sum(id#7) windowspecdefinition(category#8, id#7 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), 1)) AS sum#12L], [category#8], [id#7 ASC NULLS FIRST]
+- LocalRelation [id#7, category#8]

============= executed Plan=================
AdaptiveSparkPlan isFinalPlan=false
+- Window [sum(id#7) windowspecdefinition(category#8, id#7 ASC NULLS FIRST, specifiedwindowframe(RowFrame, currentrow$(), 1)) AS sum#12L], [category#8], [id#7 ASC NULLS FIRST]
   +- Sort [category#8 ASC NULLS FIRST, id#7 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(category#8, 200), ENSURE_REQUIREMENTS, [id=#52]
         +- LocalTableScan [id#7, category#8]


Exchange hashpartitioning

实际例子

row_number

 
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.expressions.Window

import org.apache.spark.sql.functions._
object Example_4_Spark {

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[4]")

    val spark: SparkSession = SparkSession.builder().config(conf)
      .appName("CsvExample")
      .master("local").getOrCreate()


    import spark.implicits._
    val scoreDF = spark.sparkContext.makeRDD(Array( Score("a", 1, 80),
      Score("b", 1, 78),
      Score("c", 1, 95),
      Score("d", 2, 74),
      Score("e", 2, 92),
      Score("f", 3, 99),
      Score("g", 3, 99),
      Score("h", 3, 45),
      Score("i", 3, 55),
      Score("j", 3, 78))).toDF("name","class","score")
    scoreDF.createOrReplaceTempView("score")
    scoreDF.show()

    println("//***************  求每个班最高成绩学生的信息  ***************/")
    println("    /*******  开窗函数的表  ********/")
    spark.sql("select name,class,score, rank() over(partition by class order by score desc) rank from score").show()

    println("    /*******  计算结果的表  *******")
    spark.sql(
      """
        |select  name, class, score, rk
        |from (
        |   select name, class, score, rank() over(partition by class order by score desc) rk
        |   from score
        |) mid where rk = 1
        |""".stripMargin).show()
  }
}

case class Score(name: String, group: Int, score: Int)