-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstream-process.py
83 lines (56 loc) · 1.82 KB
/
stream-process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# - read from any kafka
# - do computation
# - write back to kafka
import sys
import json
import logging
import time
import atexit
from kafka import KafkaProducer
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
logging.basicConfig()
logger = logging.getLogger('stream-process')
logger.setLevel(logging.INFO)
topic = ""
new_topic = ""
kafka_broker = ""
kafka_producer = None
def shutdown_hook(producer):
producer.flush(10)
producer.close(10)
logger.info('producer closed')
def process(timeobj, rdd):
# - calculate average
num_of_records = rdd.count()
if num_of_records == 0:
logger.info('no data received')
return
price_sum = rdd.map(lambda record: float(json.loads(record[1].decode('utf-8'))[0].get('LastTradePrice'))).reduce(lambda a, b: a + b)
average = price_sum / num_of_records
logger.info('received %d records, average price is %f' %(num_of_records, average))
current_time = time.time()
data = json.dumps({
'timestamp' : current_time,
'average' : average
})
try:
kafka_producer.send(target_topic, value = data)
except Exception as e:
logger.warn('failed to send data to kafka')
if __name__ == '__main__':
topic, target_topic, kafka_broker = sys.argv[1:]
# - create spark context
sc = SparkContext("local[2]", "StockAveragePrice") # local[2] means 2 threads
sc.setLogLevel("INFO")
ssc = StreamingContext(sc, 5) # 5 is time interval
# - create a direct kafka stream for processing
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {'metadata.broker.list': kafka_broker})
kafka_producer = KafkaProducer (
bootstrap_servers = kafka_broker
)
directKafkaStream.foreachRDD(process)
atexit.register(shutdown_hook, kafka_producer)
ssc.start()
ssc.awaitTermination() # can respond to keyboard termination