-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer_api_to_topic_kafka.py
105 lines (82 loc) · 2.68 KB
/
producer_api_to_topic_kafka.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'
# imports
import json
import time
from json import dumps
import datetime
from kafka import KafkaProducer
import requests
#Sets
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,TimestampType,LongType,ArrayType,BooleanType,DateType
import pyspark.sql.functions as sf
spark = SparkSession.builder \
.appName("POC") \
.master("local[2]") \
.config("spark.driver.host", "localhost")\
.getOrCreate()
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
#Functions
def dataframe_to_rdd(df):
rdd = df.rdd.map(lambda row: row.asDict(True))
def transform(row):
row["timestamp_kafka"]= str(datetime.datetime.now())
return row
rdd = rdd.map(lambda row: transform(row))
return rdd
def rdd_to_tuple(rdd):
tup = tuple(rdd.collect())
return tup
def get_last_process():
df_get = spark.read.csv( ).withColumnRenamed('_c0', 'last_process')
time_lp = df_get.select('last_process').collect()
return time_lp[0].last_process
#variables
body = []
lenResponse = 1000
iterationCount = 0
URL = "url-hash"
# TOKEN = "token-hash"
TOKEN = "basic 64 token hash"
time_ini = 1683299700
# Starts iterations in API
while lenResponse == 1000 and iterationCount < 10:
# Set variable
split = 0
# Parameters for Request
querystring = {"start_time": time_ini}
headers = {
'content-type': "application/json",
'authorization': TOKEN,
}
# Runs the Request in the API
response = requests.request(
"GET", URL, headers=headers, params=querystring
)
body_list = json.loads(response.content.decode('utf-8'))
# Handles API response
try:
body_page = body_list['tickets']
except:
# For request limit wait 1 minute
msg_error = body_list['error']
if msg_error == 'APIRateLimitExceeded':
time.sleep(60)
split = 1
else:
raise Exception(body_list['description'])
# Successful request goes on adding the data
if split == 0:
time_ini = body_list['end_time']
print(time_ini)
# Counts records in call
lenResponse = len(body_page)
iterationCount += 1
# Union informations
df = spark.sparkContext.parallelize(body_page).map(lambda x: json.dumps(x))
df = spark.read.json(df)
tup = rdd_to_tuple(dataframe_to_rdd(df))
for row in tup:
producer.send('kttm', json.dumps(row).encode('utf=8'))
spark.stop()