Skip to content

Commit

Permalink
[HWORKS-526] Remove inode foreign key from validation report (#1366)
Browse files Browse the repository at this point in the history
  • Loading branch information
gibchikafa authored May 20, 2023
1 parent cf66cfe commit afead64
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
import io.hops.hopsworks.common.featurestore.datavalidationv2.reports.ValidationReportController;
import io.hops.hopsworks.common.featurestore.datavalidationv2.reports.ValidationReportDTO;
import io.hops.hopsworks.common.featurestore.datavalidationv2.results.ValidationResultDTO;
import io.hops.hopsworks.common.hdfs.inode.InodeController;
import io.hops.hopsworks.persistence.entity.dataset.Dataset;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidationv2.ValidationReport;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidationv2.ValidationResult;
import io.hops.hopsworks.persistence.entity.project.Project;
import org.apache.hadoop.fs.Path;

import javax.ejb.EJB;
import javax.ejb.Stateless;
Expand All @@ -35,6 +36,7 @@
import javax.ws.rs.core.UriInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

@Stateless
Expand All @@ -45,8 +47,6 @@ public class ValidationReportBuilder {
private ValidationReportController validationReportController;
@EJB
private ValidationResultBuilder validationResultBuilder;
@EJB
private InodeController inodeController;


public ValidationReportDTO uri(ValidationReportDTO dto, UriInfo uriInfo, Project project,
Expand All @@ -64,14 +64,22 @@ public ValidationReportDTO build(UriInfo uriInfo, Project project,
Featuregroup featuregroup, ValidationReport validationReport) {
ValidationReportDTO dto = new ValidationReportDTO();
uri(dto, uriInfo, project, featuregroup);

Optional<Dataset> validationDatasetOptional = validationReportController.getValidationDataset(project);
String path = "";
// the validation dataset will probably always be there if we have the validation reports in db
if (validationDatasetOptional.isPresent()) {
Path validationReportDir = validationReportController
.getValidationReportDirFullPath(featuregroup, validationDatasetOptional.get());
Path validationReportFileFullPath = new Path(validationReportDir, validationReport.getFileName());
path = validationReportFileFullPath.toString();
}
dto.setMeta(validationReport.getMeta());
dto.setId(validationReport.getId());
dto.setSuccess(validationReport.getSuccess());
dto.setStatistics(validationReport.getStatistics());
dto.setValidationTime(validationReport.getValidationTime());
dto.setEvaluationParameters(validationReport.getEvaluationParameters());
dto.setFullReportPath(inodeController.getPath(validationReport.getInode()));
dto.setFullReportPath(path);
dto.setIngestionResult(validationReport.getIngestionResult());

List<ValidationResultDTO> validationResultDTOs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidationv2.IngestionResult;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidationv2.ValidationReport;
import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidationv2.ValidationResult;
import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode;
import io.hops.hopsworks.persistence.entity.project.Project;
import io.hops.hopsworks.persistence.entity.project.alert.ProjectServiceAlert;
import io.hops.hopsworks.persistence.entity.project.alert.ProjectServiceAlertStatus;
import io.hops.hopsworks.persistence.entity.project.service.ProjectServiceEnum;
import io.hops.hopsworks.persistence.entity.user.Users;
import io.hops.hopsworks.restutils.RESTCodes;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
Expand Down Expand Up @@ -204,24 +204,29 @@ public String constructAlertDescription(ValidationReport validationReport) {
}

public void deleteValidationReportById(Users user, Integer validationReportId) throws FeaturestoreException {
Optional<ValidationReport> validationReport = validationReportFacade.findById(validationReportId);
if (!validationReport.isPresent()) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.VALIDATION_REPORT_NOT_FOUND, Level.WARNING,
String.format("ValidationReport with id : %d was not found causing delete to fail", validationReportId));
}
deleteSingleReportInode(user, validationReport.get());
ValidationReport validationReport = validationReportFacade.findById(validationReportId).orElseThrow(() ->
new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.VALIDATION_REPORT_NOT_FOUND, Level.WARNING,
String.format("ValidationReport with id : %d was not found causing delete to fail", validationReportId)));
validationReportFacade.delete(validationReport);
deleteValidationReportFile(user, validationReport);
}

private void deleteSingleReportInode(Users user, ValidationReport validationReport)
private void deleteValidationReportFile(Users user, ValidationReport validationReport)
throws FeaturestoreException {
Featuregroup featuregroup = validationReport.getFeaturegroup();
Project project = featuregroup.getFeaturestore().getProject();
Optional<Dataset> validationDatasetOptional = getValidationDataset(project);
if (!validationDatasetOptional.isPresent()) {
return;
}
Path validationDirFullPath = getValidationReportDirFullPath(validationReport.getFeaturegroup(),
validationDatasetOptional.get());
Path validationFileFullPath = new Path(validationDirFullPath, validationReport.getFileName());
DistributedFileSystemOps udfso = null;
try {
udfso = dfs.getDfsOps(hdfsUsersController.getHdfsUserName(project, user));
// delete json files
udfso.rm(inodeController.getPath(validationReport.getInode()), false);
udfso.rm(validationFileFullPath, false);
} catch (IOException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_DELETING_ON_DISK_VALIDATION_REPORT,
Level.WARNING, "", e.getMessage(), e);
Expand All @@ -233,13 +238,17 @@ private void deleteSingleReportInode(Users user, ValidationReport validationRepo
public void deleteFeaturegroupDataValidationDir(Users user, Featuregroup featuregroup)
throws FeaturestoreException {
Project project = featuregroup.getFeaturestore().getProject();
Optional<Dataset> validationDataset = getValidationDataset(project);
if (!validationDataset.isPresent()) {
return;
}
DistributedFileSystemOps udfso = null;
try {
udfso = dfs.getDfsOps(hdfsUsersController.getHdfsUserName(project, user));
Dataset dataValidationDir = getOrCreateDataValidationDataset(project, user);
Dataset dataValidationDir = validationDataset.get();
Path targetDir = new Path(datasetController.getDatasetPath(dataValidationDir), featuregroup.getName());
udfso.rm(targetDir, true);
} catch (DatasetException | HopsSecurityException | IOException e) {
} catch (IOException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_DELETING_ON_DISK_VALIDATION_REPORT,
Level.WARNING, "", e.getMessage(), e);
} finally {
Expand Down Expand Up @@ -274,8 +283,8 @@ public ValidationReport convertReportDTOToPersistent(Users user, Featuregroup fe
report.setValidationTime(validationTime);

// Dump the whole report to a file.
Inode reportInode = registerValidationReportToDisk(user, featuregroup, reportDTO, validationTime);
report.setInode(reportInode);
String validationReportFileName = registerValidationReportToDisk(user, featuregroup, reportDTO, validationTime);
report.setFileName(validationReportFileName);

List<ValidationResult> results = new ArrayList<ValidationResult>();

Expand All @@ -287,32 +296,14 @@ public ValidationReport convertReportDTOToPersistent(Users user, Featuregroup fe
return report;
}

private Inode registerValidationReportToDisk(Users user, Featuregroup featuregroup,
private String registerValidationReportToDisk(Users user, Featuregroup featuregroup,
ValidationReportDTO reportDTO, Date validationTime) throws FeaturestoreException {
DistributedFileSystemOps udfso = null;
Project project = featuregroup.getFeaturestore().getProject();

JSONObject reportJSON = convertValidationReportDTOToJson(reportDTO);

try {
udfso = dfs.getDfsOps(hdfsUsersController.getHdfsUserName(project, user));

// Dataset is confusing terminology. Get path to on_disk dataValidationDir
Dataset dataValidationDir = getOrCreateDataValidationDataset(project, user);

// All validation report attached to a particular featuregroup version will be stored in same directory
Path reportDirPath = new Path(datasetController.getDatasetPath(dataValidationDir), featuregroup.getName());
if (!udfso.isDir(reportDirPath.toString())) {
udfso.mkdir(reportDirPath.toString());
}
reportDirPath = new Path(reportDirPath, featuregroup.getVersion().toString());
if (!udfso.isDir(reportDirPath.toString())) {
udfso.mkdir(reportDirPath.toString());
}
reportDirPath = new Path(reportDirPath, "ValidationReports");
if (!udfso.isDir(reportDirPath.toString())) {
udfso.mkdir(reportDirPath.toString());
}
udfso = dfs.getDfsOps(hdfsUsersController.getHdfsUserName(featuregroup.getFeaturestore().getProject(), user));
Path reportDirPath = createValidationReportDirPath(user, featuregroup);
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HHmmss");
formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
String fileName = String.format("validation_report_%s.json", formatter.format(validationTime));
Expand All @@ -322,30 +313,54 @@ private Inode registerValidationReportToDisk(Users user, Featuregroup featuregro
Level.SEVERE, String.format("Validation report with file name %s already exists.", fileName));
}
udfso.create(reportPath, reportJSON.toString());

return inodeController.getInodeAtPath(reportPath.toString());
} catch (DatasetException | HopsSecurityException | IOException e) {
return fileName;
} catch (FeaturestoreException | IOException e) {
throw new FeaturestoreException(RESTCodes.FeaturestoreErrorCode.ERROR_SAVING_ON_DISK_VALIDATION_REPORT,
Level.WARNING, e.getMessage());
} finally {
dfs.closeDfsClient(udfso);
}

}

private Dataset getOrCreateDataValidationDataset(Project project, Users user)
throws DatasetException, HopsSecurityException {
Optional<Dataset> dataValidationDataset = project.getDatasetCollection().stream()
.filter(d -> d.getName().equals(Settings.ServiceDataset.DATAVALIDATION.getName()))
.findFirst();
// This is the case of an old project without DATAVALIDATION dataset, create it.
if (dataValidationDataset.isPresent()) {
return dataValidationDataset.get();
} else {
return createDataValidationDataset(project, user);
public Path createValidationReportDirPath(Users user, Featuregroup featuregroup)
throws FeaturestoreException {
DistributedFileSystemOps udfso = null;
Project project = featuregroup.getFeaturestore().getProject();
try {
udfso = dfs.getDfsOps(hdfsUsersController.getHdfsUserName(project, user));

Optional<Dataset> validationReportOptional =
getValidationDataset(featuregroup.getFeaturestore().getProject());
Dataset validationDataset = validationReportOptional.get();
if (!validationReportOptional.isPresent()) {
createDataValidationDataset(project, user);
}
// create recursively
Path fullPath = getValidationReportDirFullPath(featuregroup, validationDataset);
udfso.mkdirs(fullPath, FsPermission.getDefault());
return fullPath;
} catch (DatasetException | HopsSecurityException | IOException e) {
throw new FeaturestoreException(
RESTCodes.FeaturestoreErrorCode.ERROR_CONSTRUCTING_VALIDATION_REPORT_DIRECTORY_PATH, Level.WARNING,
e.getMessage());
} finally {
dfs.closeDfsClient(udfso);
}
}

public Path getValidationReportDirFullPath(Featuregroup featuregroup, Dataset dataValidationDir) {
// Dataset is confusing terminology. Get path to on_disk dataValidationDir
Path reportDirPath = new Path(datasetController.getDatasetPath(dataValidationDir), featuregroup.getName());
reportDirPath = new Path(reportDirPath, featuregroup.getVersion().toString());
return new Path(reportDirPath, "ValidationReports");
}

public Optional<Dataset> getValidationDataset(Project project) {
return project.getDatasetCollection().stream()
.filter(d -> d.getName().equals(Settings.ServiceDataset.DATAVALIDATION.getName()))
.findFirst();
}

private Dataset createDataValidationDataset(Project project, Users user)
throws DatasetException, HopsSecurityException {
// Needs super user privileges as we are creating a dataset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ public Optional<ValidationReport> findFeaturegroupLatestValidationReport(Feature
return latestReport;
}

@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public void delete(ValidationReport validationReport) {
if (validationReport != null) {
em.remove(em.merge(validationReport));
}
}

private void setFilter(Set<? extends AbstractFacade.FilterBy> filter, Query q) {
if (filter == null || filter.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package io.hops.hopsworks.persistence.entity.featurestore.featuregroup.datavalidationv2;

import io.hops.hopsworks.persistence.entity.featurestore.featuregroup.Featuregroup;
import io.hops.hopsworks.persistence.entity.hdfs.inode.Inode;

import javax.persistence.Basic;
import javax.persistence.CascadeType;
import javax.persistence.Column;
Expand All @@ -29,16 +27,15 @@
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.JoinColumn;
import javax.persistence.JoinColumns;
import javax.persistence.ManyToOne;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.OneToMany;
import javax.persistence.OneToOne;
import javax.persistence.Table;
import javax.persistence.Temporal;
import javax.persistence.TemporalType;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.Serializable;
import java.util.Collection;
Expand Down Expand Up @@ -79,15 +76,10 @@ public class ValidationReport implements Serializable {
@Column(name = "statistics")
private String statistics;

@JoinColumns({
@JoinColumn(name = "inode_pid",
referencedColumnName = "parent_id"),
@JoinColumn(name = "inode_name",
referencedColumnName = "name"),
@JoinColumn(name = "partition_id",
referencedColumnName = "partition_id")})
@OneToOne(optional = false)
private Inode inode;
@Basic
@Column(name = "file_name")
@Size(max = 255)
private String fileName;

@Basic
@Column(name = "meta")
Expand Down Expand Up @@ -141,12 +133,12 @@ public void setMeta(String meta) {
this.meta = meta;
}

public Inode getInode() {
return inode;
public String getFileName() {
return fileName;
}

public void setInode(Inode inode) {
this.inode = inode;
public void setFileName(String fileName) {
this.fileName = fileName;
}

public String getEvaluationParameters() {
Expand Down Expand Up @@ -190,7 +182,6 @@ public void setValidationResults(
this.validationResults = validationResults;
}


@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,10 @@ public enum FeaturestoreErrorCode implements RESTErrorCode {
CONNECTION_CHECKER_ERROR(220, "Failure in testing connection for storage connector",
Response.Status.BAD_REQUEST),
ERROR_CREATING_ONLINE_FEATURESTORE_KAFKA_OFFSET_TABLE(221, "An error occurred when trying to " +
"create the kafka offset table for an online feature store", Response.Status.INTERNAL_SERVER_ERROR);
"create the kafka offset table for an online feature store", Response.Status.INTERNAL_SERVER_ERROR),
ERROR_CONSTRUCTING_VALIDATION_REPORT_DIRECTORY_PATH(222,
"An error occurred while constructing validation report directory path",
Response.Status.INTERNAL_SERVER_ERROR);

private int code;
private String message;
Expand Down

0 comments on commit afead64

Please sign in to comment.