Skip to content

Commit

Permalink
fix formatting for spark materilization engine
Browse files Browse the repository at this point in the history
Signed-off-by: tokoko <[email protected]>
  • Loading branch information
tokoko committed Feb 6, 2024
1 parent f925a90 commit 4c16133
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,8 @@ def _materialize_one(
)

spark_df.mapInPandas(
lambda x: _map_by_partition(x, spark_serialized_artifacts),
"status int"
).count()
lambda x: _map_by_partition(x, spark_serialized_artifacts), "status int"
).count() # dummy action to force evaluation

return SparkMaterializationJob(
job_id=job_id, status=MaterializationJobStatus.SUCCEEDED
Expand Down Expand Up @@ -235,8 +234,11 @@ def _map_by_partition(iterator, spark_serialized_artifacts: _SparkSerializedArti

table = pyarrow.Table.from_pandas(pdf)

# unserialize artifacts
feature_view, online_store, repo_config = spark_serialized_artifacts.unserialize()
(
feature_view,
online_store,
repo_config,
) = spark_serialized_artifacts.unserialize()

if feature_view.batch_source.field_mapping is not None:
table = _run_pyarrow_field_mapping(
Expand All @@ -248,12 +250,16 @@ def _map_by_partition(iterator, spark_serialized_artifacts: _SparkSerializedArti
for entity in feature_view.entity_columns
}

rows_to_write = _convert_arrow_to_proto(table, feature_view, join_key_to_value_type)
rows_to_write = _convert_arrow_to_proto(
table, feature_view, join_key_to_value_type
)
online_store.online_write_batch(
repo_config,
feature_view,
rows_to_write,
lambda x: None,
)

yield pd.DataFrame([pd.Series(range(1, 2))]) # dummy result because mapInPandas needs to return something
yield pd.DataFrame(
[pd.Series(range(1, 2))]
) # dummy result because mapInPandas needs to return something
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@
"grpcio-testing>=1.56.2,<2",
"minio==7.1.0",
"mock==2.0.0",
"moto",
"moto<5",
"mypy>=0.981,<0.990",
"avro==1.10.0",
"fsspec<2023.10.0",
Expand Down

0 comments on commit 4c16133

Please sign in to comment.