Skip to content

Commit

Permalink
spark rollfun disable median
Browse files Browse the repository at this point in the history
  • Loading branch information
jangorecki authored Jul 25, 2023
1 parent a18c7c5 commit e064aab
Showing 1 changed file with 32 additions and 32 deletions.
64 changes: 32 additions & 32 deletions spark/rollfun-spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,38 +186,38 @@
spark.catalog.uncacheTable("ans")
del ans, sql

question = "median" # q5
sql = f'select median(v1) over (order by id1 rows between {w-1} preceding and current row) as v1 from x'
gc.collect()
t_start = timeit.default_timer()
ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY)
print((ans.count(), len(ans.columns)), flush=True) # shape
t = timeit.default_timer() - t_start
m = memory_usage()
ans.createOrReplaceTempView("ans")
t_start = timeit.default_timer()
chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
ans.unpersist()
spark.catalog.uncacheTable("ans")
del ans
gc.collect()
t_start = timeit.default_timer()
ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY)
print((ans.count(), len(ans.columns)), flush=True) # shape
t = timeit.default_timer() - t_start
m = memory_usage()
ans.createOrReplaceTempView("ans")
t_start = timeit.default_timer()
chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
ans.unpersist()
spark.catalog.uncacheTable("ans")
del ans, sql
#question = "median" # q5
#sql = f'select median(v1) over (order by id1 rows between {w-1} preceding and current row) as v1 from x'
#gc.collect()
#t_start = timeit.default_timer()
#ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY)
#print((ans.count(), len(ans.columns)), flush=True) # shape
#t = timeit.default_timer() - t_start
#m = memory_usage()
#ans.createOrReplaceTempView("ans")
#t_start = timeit.default_timer()
#chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']]
#chkt = timeit.default_timer() - t_start
#write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
#ans.unpersist()
#spark.catalog.uncacheTable("ans")
#del ans
#gc.collect()
#t_start = timeit.default_timer()
#ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY)
#print((ans.count(), len(ans.columns)), flush=True) # shape
#t = timeit.default_timer() - t_start
#m = memory_usage()
#ans.createOrReplaceTempView("ans")
#t_start = timeit.default_timer()
#chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']]
#chkt = timeit.default_timer() - t_start
#write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk)
#print(ans.head(3), flush=True)
#print(ans.tail(3), flush=True)
#ans.unpersist()
#spark.catalog.uncacheTable("ans")
#del ans, sql

question = "multiroll" # q6
sql = f'select avg(v1) over (order by id1 rows between {w-51} preceding and current row) as v1_small, avg(v1) over (order by id1 rows between {w+49} preceding and current row) as v1_big, avg(v2) over (order by id1 rows between {w-51} preceding and current row) as v2_small, avg(v2) over (order by id1 rows between {w+49} preceding and current row) as v2_big from x'
Expand Down

0 comments on commit e064aab

Please sign in to comment.