forked from filodb/FiloDB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFiloDB_GDELT.snb
135 lines (135 loc) · 5.82 KB
/
FiloDB_GDELT.snb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
{
"metadata" : {
"name" : "FiloDB GDELT",
"user_save_timestamp" : "1969-12-31T16:00:00.000Z",
"auto_save_timestamp" : "1969-12-31T16:00:00.000Z",
"language_info" : {
"name" : "scala",
"file_extension" : "scala",
"codemirror_mode" : "text/x-scala"
},
"trusted" : true,
"customLocalRepo" : null,
"customRepos" : null,
"customDeps" : null,
"customImports" : [ "import org.apache.spark.sql.functions._\n" ],
"customArgs" : null,
"customSparkConf" : {
"spark.app.name" : "Notebook",
"spark.master" : "local[8]",
"spark.executor.memory" : "2G"
}
},
"cells" : [ {
"metadata" : { },
"cell_type" : "markdown",
"source" : "## Querying the GDELT dataset using FiloDB\n1. Start Cassandra\n2. Ingest the GDELT dataset, according to the [FiloDB README](https://github.com/tuplejump/FiloDB#spark-data-source-api-example-spark-shell) \n\nNote that the GDELT dataset is partitioned by YearMonth."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val df = sqlContext.read.format(\"filodb.spark\").option(\"dataset\", \"gdelt\").load()",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "df.registerTempTable(\"gdelt\")",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "You can query FiloDB using the Spark DataFrames DSL, or using SQL. Both of these are demoed below. For SQL queries you need to register the DataFrame with a table name first.\nThe third way is a way of converting the output to a Scala collection so that Spark Notebook can render as a graph."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "df.select(count(\"MonthYear\")).show",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "sqlContext.sql(\"select Actor1Name, count(*) as c from gdelt group by Actor1Name order by c desc limit 15\")",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : true
},
"cell_type" : "code",
"source" : "val sql1 = \"select Actor1Name, count(*) as c from gdelt group by Actor1Name order by c desc limit 15\"\nsqlContext.sql(sql1).collect.map { row => (row.getString(0), row.getLong(1)) }",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : true
},
"cell_type" : "markdown",
"source" : "## Machine Learning with FiloDB\n\nNow, how about something uniquely Spark .. feed SQL query results to MLLib to compute a correlation:"
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "import org.apache.spark.mllib.stat.Statistics\nval numMentions = df.select(\"NumMentions\").map(row => row.getInt(0).toDouble)\nval numArticles = df.select(\"NumArticles\").map(row => row.getInt(0).toDouble)\nStatistics.corr(numMentions, numArticles, \"pearson\")",
"outputs" : [ ]
}, {
"metadata" : { },
"cell_type" : "markdown",
"source" : "## Reactive Top Countries Graph by Year-Month\n\nNow, let's demonstrate how FiloDB lets one produce low-latency graphs and UIs. We will figure out the top countries by number of political events (Actor1CountryCode) for a particular year-month, and let the user control what the year-month is."
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "val topCountries = sqlContext.sql(\"\"\"select Actor1CountryCode, count(*) as c from gdelt WHERE MonthYear = 197901 \n group by Actor1CountryCode order by c desc limit 15\"\"\")\n// NOTE: CustomC3Chart looks better but the reactive applyOn method is broken\n// val chart = CustomC3Chart(topCountries.collect(),\n// chartOptions = \"\"\"\n// { data: { x: 'Actor1CountryCode', \n// y: 'c',\n// type: 'bar'},\n// axis: {x: { type: 'categorical' }}\n// }\n// \"\"\")\nval chart = widgets.BarChart(topCountries.collect(), fields=Some((\"Actor1CountryCode\", \"c\")))",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "import extraTypes._\nimplicit val ITI:types.InputType[Int] = SliderType[Int](198101, 198112, 1)\nval si = new InputBox(197901, \"Choose the month (Jan to Dec 1981)\")",
"outputs" : [ ]
}, {
"metadata" : {
"trusted" : true,
"input_collapsed" : false,
"collapsed" : false
},
"cell_type" : "code",
"source" : "si.currentData --> Connection.fromObserver { monthYear: Int =>\n // Now modify the query to return new data whenever dropdown selection changes\n val newData = sqlContext.sql(s\"\"\"select Actor1CountryCode, count(*) as c from gdelt WHERE MonthYear = $monthYear \n group by Actor1CountryCode order by c desc limit 15\"\"\")\n chart.applyOn(newData.collect())\n }\n",
"outputs" : [ ]
} ],
"nbformat" : 4
}