Contents
  1. 1. 从github上获取 databricks/spark-sql-perf 与 davies/tpcds-kit
  2. 2. 使用tpcds-kit
  3. 3. 使用Spark-sql-perf
    1. 3.1. 编译源代码
    2. 3.2. 运行TPCDS测试

从github上获取 databricks/spark-sql-perf 与 davies/tpcds-kit

使用tpcds-kit

  • 编译源代码,Spark-sql-perf 会调用该工具生成数据
  • 进入tools目录,
    1. 复制 Makefile.suite 为 Makefile
    2. 编辑 Makefile 并找出包含 “OS = “的行
    3. 阅读注释并修改OS为当前系统,例如 “OS = LINUX”
    4. 运行 “make”

使用Spark-sql-perf

编译源代码

  • 需要使用到sbt编译工具:
    `curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo`
    `sudo yum install sbt`
    
  • 编译前切换到0.4.3版本,git checkout v0.4.3
  • 修改park-sql-perf/src/main/scala/com/databricks/spark/sql/perf中的DatasetPerformance.scala,原始的该文件会导致编译不通过
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
- import org.apache.spark.sql.SQLContext
+ import org.apache.spark.sql.{Encoder, SQLContext}
//修改average为:
val average = new Aggregator[Long, SumAndCount, Double] {
override def zero: SumAndCount = SumAndCount(0, 0)
override def reduce(b: SumAndCount, a: Long): SumAndCount = {
b.count += 1
b.sum += a
b
}
override def bufferEncoder = implicitly[Encoder[SumAndCount]]
override def outputEncoder = implicitly[Encoder[Double]]
override def finish(reduction: SumAndCount): Double = reduction.sum.toDouble / reduction.count
override def merge(b1: SumAndCount, b2: SumAndCount): SumAndCount = {
b1.count += b2.count
b1.sum += b2.sum
b1
}
}.toColumn
  • 修改build.sbt 文件,将scala version 改为2.11.8,与spark2.0相同,否则在spark 2.0上执行会导致spark-shell崩溃
  • 进入项目根目录 ./bin/run –benchmark DatasetPerformance 会自动编译工程并运行datasetperformance测试

运行TPCDS测试

  • 更改spark.env中默认的driver内存限制,本次测试中改为20G:
    SPARK_DRIVER_MEMORY="20G" #Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
  • 进入spark2.0的spark-shell,指定在yarn上运行,指定executor数量与内存等参数:
1
2
3
/usr/hdp/2.4.0.0-169/spark-2.0.0-preview-bin-hadoop2.7/bin/spark-shell
--jars /data/ygmz/sparksqlperf/spark-sql-perf/target/scala-2.11/spark-sql-perf_2.11-0.4.3.jar
--num-executors 20 --executor-cores 2 --executor-memory 8G --master yarn-client
  • 从sc生成sqlcontext
1
2
3
4
5
val sc: SparkContext // 运行spark-shell后应该已经创建
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
  • 创建数据表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import com.databricks.spark.sql.perf.tpcds.Tables
// Tables in TPC-DS benchmark used by experiments.
// dsdgenDir 为dsdgen的目录(需要先编译dsdgen).
// scalefactor 为数据量,GB
val tables = new Tables(sqlContext, "/data/ygmz/tpcds-kit/tools", 1)
// 生成数据.
tables.genData("hdfs://ochadoop02.jcloud.local:8020/tpctest", "parquet", true, false, false, false, false)
// Create metastore tables in a specified database for your data.
// 表创建完成后,自动切换到创建的数据库“sparktest”
tables.createExternalTables("hdfs://ochadoop02.jcloud.local:8020/tpctest", "parquet", "finaltest", false)
// 创建临时表
tables.createTemporaryTables(location, format)
// 设置 TPC-DS experiment
import com.databricks.spark.sql.perf.tpcds.TPCDS
val tpcds = new TPCDS (sqlContext = sqlContext)
//运行测试,测试集指定为tpcds1_4
val experiment = tpcds.runExperiment(tpcds.tpcds1_4Queries)
  • 在运行中可以调用experiment.html查看sql执行状态
  • 运行结束后结果保存在/spark/performance/目录下,以时间戳区分不同次实验的结果,格式为json
Contents
  1. 1. 从github上获取 databricks/spark-sql-perf 与 davies/tpcds-kit
  2. 2. 使用tpcds-kit
  3. 3. 使用Spark-sql-perf
    1. 3.1. 编译源代码
    2. 3.2. 运行TPCDS测试