Skip to content

qiaojialin/tsfile-kmx-spark-connector

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

17 Commits
 
 
 
 
 
 
 
 

Repository files navigation

tsfile-kmx-spark-connector

注意:本版本的connector适用的tsfile中delta_object字段的格式为: key:value(+key:value)* 其中key不相同 将一个或多个TsFile展示成SparkSQL中的一张表。允许指定单个目录,或使用通配符匹配多个目录。如果是多个TsFile,schema将保留各个TsFile中sensor的并集。

示例

src/test/scala/cn.edu.thu.kvtsfile.spark.TSFileSuit

路径指定方式

basefolder/key=1/file1.tsfile

basefolder/key=2/file2.tsfile 指定basefolder为path,会在表中多加一列key,值为1或2。

如: path=basefolder

如果使用通配符指定,将不会当做partiton

如: path=basefolder/*/*.tsfile

basefolder/file1.tsfile basefolder/file2.tsfile

指定basefolder会将多个tsfile的schema合并,保留sensor的并集

如: path=basefolder

版本需求

The versions required for Spark and Java are as follow:

Spark Version Scala Version Java Version
2.0+ 2.11 1.8

数据类型转化

This library uses the following mapping the data type from TsFile to SparkSQL:

TsFile SparkSQL
INT32 IntegerType
INT64 LongType
FLOAT FloatType
DOUBLE DoubleType

TsFile Schema -> SparkSQL Table Structure

The set of time-series data in section "Time-series Data" is used here to illustrate the mapping from TsFile Schema to SparkSQL Table Stucture.

turbineId:tunbine1
sensor_1sensor_2sensor_3
timevaluetimevaluetimevalue
11.2120250
31.4220451
51.1321652
71.8420853
A set of time-series data

There is only one reserved columns in Spark SQL Table:

  • time : Timestamp, LongType

The SparkSQL Table Structure is as follow:

time(LongType) turbineId(StringType)sensor_1(FloatType)sensor_2(IntType)sensor_3(IntType)
1 turbine1 1.220null
2 turbine1 null2050
3 turbine1 1.421null
4 turbine1 null2051
5 turbine1 1.1nullnull
6 turbine1 nullnull52
7 turbine1 1.8nullnull
8 turbine1 nullnull53

Examples

Scala API
  • Example 1

     // import this library and Spark
     import cn.edu.thu.kvtsfile._
     import org.apache.spark.sql.SparkSession
    
     val spark = SparkSession.builder().master("local").getOrCreate()
    
     //read data in TsFile and create a table
     val df = spark.read.kvtsfile("test.ts")
     df.createOrReplaceTempView("TsFile_table")
    
     //query with filter
     val newDf = spark.sql("select * from TsFile_table where sensor_1 > 1.2").cache()
    
     newDf.show()
    
  • Example 2

     import cn.edu.thu.kvtsfile._
     import org.apache.spark.sql.SparkSession
     val spark = SparkSession.builder().master("local").getOrCreate()
     val df = spark.read
           .format("cn.edu.thu.kvtsfile")
           .load("test.ts")
    
    
     df.filter("sensor_1 > 1.2").show()
    
  • Example 3

     import cn.edu.thu.kvtsfile._
     import org.apache.spark.sql.SparkSession
     val spark = SparkSession.builder().master("local").getOrCreate()
    
     //create a table in SparkSQL and build relation with a TsFile
     spark.sql("create temporary view TsFile using cn.edu.thu.kvtsfile options(path = \"test.ts\")")
    
     spark.sql("select * from TsFile where sensor_1 > 1.2").show()
    
spark-shell

可以将项目打包在 spark-shell中使用。

mvn clean scala:compile compile package

包所在位置:target/kvtsfile-spark-connector-0.1.0.jar

$ bin/spark-shell --jars kvtsfile-spark-connector-0.1.0.jar,tsfile-0.1.0.jar

scala> sql("CREATE TEMPORARY TABLE TsFile_table USING cn.edu.thu.kvtsfile.spark OPTIONS (path \"hdfs://localhost:9000/test.ts\")")

scala> sql("select * from TsFile_table where sensor_1 > 1.2").show()

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published