Skip to content

Commit

Permalink
split S3 files into smaller files to send large union file (#77)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #77

# Context
We found that AWS-SDK S3 API would fail when we try to write more than 5GB of data. It is a blocking us to do capacity testing for a larger FARGATE container.

In this diff, as mentioned in [the post](https://fb.workplace.com/groups/pidmatchingxfn/posts/493743615908631), we are splitting union file based on number of rows.

# Description
We have made following changes.
- Added new arg `s3api_max_rows` in the private-id-multi-key-client and private-id-multi-key-server binaries. We will use this to split a file for S3 upload.
- Added an optional arg `num_split` in save_id_map() and writer_helper(). When `num_split` is specified, it would use the arg `path` as its prefix and save files in `{path}_0`, `{path}_1`, etc.
- In rpc_server.rs and client.rs, calculates the num_split based on s3api_max_rows, and passes the num_split arg for S3 only. Then, for each split file, it calls copy_from_local().

Differential Revision: D39219674

fbshipit-source-id: 82dc1788b0d4db5cf9c3de07178b52a8cc11633c
  • Loading branch information
Yuya Shiraki authored and facebook-github-bot committed Sep 4, 2022
1 parent 84724a1 commit 4f28018
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 44 deletions.
30 changes: 23 additions & 7 deletions protocol-rpc/src/rpc/private-id-multi-key/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.long("run_id")
.default_value("")
.help("A run_id used to identify all the logs in a PL/PA run."),
Arg::with_name("s3api_max_rows")
.long("s3api_max_rows")
.takes_value(true)
.default_value("5000000")
.help("Number of rows per each output S3 file to split."),
])
.groups(&[
ArgGroup::with_name("tls")
Expand All @@ -114,6 +119,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let global_timer = timer::Timer::new_silent("global");
let input_path_str = matches.value_of("input").unwrap_or("input.csv");
let mut input_path = input_path_str.to_string();
let s3api_max_rows_str = matches.value_of("s3api_max_rows").unwrap_or("5000000");
let s3_api_max_rows: usize = s3api_max_rows_str.to_string().parse().unwrap();
if let Ok(s3_path) = S3Path::from_str(input_path_str) {
info!(
"Reading {} from S3 and copying to local path",
Expand Down Expand Up @@ -358,27 +365,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let s3_tempfile = tempfile::NamedTempFile::new().unwrap();
let (_file, path) = s3_tempfile.keep().unwrap();
let path = path.to_str().expect("Failed to convert path to str");
let num_split = ((partner_protocol.get_id_map_size() as f32)
/ (s3_api_max_rows as f32))
.ceil() as usize;
partner_protocol
.save_id_map(&String::from(path))
.save_id_map(&String::from(path), Some(num_split))
.expect("Failed to save id map to tempfile");
output_path_s3
.copy_from_local(&path)
.await
.expect("Failed to write to S3");
for n in 0..num_split {
let chunk_path = format!("{}_{}", path, n);
output_path_s3
.copy_from_local(&chunk_path)
.await
.expect("Failed to write to S3");
}
} else if let Ok(output_path_gcp) = GCSPath::from_str(p) {
let gcs_tempfile = tempfile::NamedTempFile::new().unwrap();
let (_file, path) = gcs_tempfile.keep().unwrap();
let path = path.to_str().expect("Failed to convert path to str");
partner_protocol
.save_id_map(&String::from(path))
.save_id_map(&String::from(path), None)
.expect("Failed to save id map to tempfile");
output_path_gcp
.copy_from_local(&path)
.await
.expect("Failed to write to GCS");
} else {
let num_split = ((partner_protocol.get_id_map_size() as f32)
/ (s3_api_max_rows as f32))
.ceil() as usize;
partner_protocol
.save_id_map(&String::from(p))
.save_id_map(&String::from(p), Some(num_split))
.expect("Failed to save id map to output file");
}
}
Expand Down
26 changes: 19 additions & 7 deletions protocol-rpc/src/rpc/private-id-multi-key/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct PrivateIdMultiKeyService {
input_with_headers: bool,
metrics_path: Option<String>,
metrics_obj: metrics::Metrics,
s3_api_max_rows: usize,
pub killswitch: Arc<AtomicBool>,
}

Expand All @@ -52,6 +53,7 @@ impl PrivateIdMultiKeyService {
output_path: Option<&str>,
input_with_headers: bool,
metrics_path: Option<String>,
s3_api_max_rows: usize,
) -> PrivateIdMultiKeyService {
PrivateIdMultiKeyService {
protocol: CompanyPrivateIdMultiKey::new(),
Expand All @@ -60,6 +62,7 @@ impl PrivateIdMultiKeyService {
input_with_headers,
metrics_path,
metrics_obj: metrics::Metrics::new("private-id-multi-key".to_string()),
s3_api_max_rows,
killswitch: Arc::new(AtomicBool::new(false)),
}
}
Expand Down Expand Up @@ -298,26 +301,35 @@ impl PrivateIdMultiKey for PrivateIdMultiKeyService {
let s3_tempfile = tempfile::NamedTempFile::new().unwrap();
let (_file, path) = s3_tempfile.keep().unwrap();
let path = path.to_str().expect("Failed to convert path to str");
let num_split = ((self.protocol.get_id_map_size() as f32)
/ (self.s3_api_max_rows as f32))
.ceil() as usize;
self.protocol
.save_id_map(&String::from(path))
.save_id_map(&String::from(path), Some(num_split))
.expect("Failed to save id map to tempfile");
output_path_s3
.copy_from_local(&path)
.await
.expect("Failed to write to S3");
for n in 0..num_split {
let chunk_path = format!("{}_{}", path, n);
output_path_s3
.copy_from_local(&chunk_path)
.await
.expect("Failed to write to S3");
}
} else if let Ok(output_path_gcp) = GCSPath::from_str(p) {
let gcs_tempfile = tempfile::NamedTempFile::new().unwrap();
let (_file, path) = gcs_tempfile.keep().unwrap();
let path = path.to_str().expect("Failed to convert path to str");
self.protocol
.save_id_map(&String::from(path))
.save_id_map(&String::from(path), None)
.expect("Failed to save id map to tempfile");
output_path_gcp
.copy_from_local(&path)
.await
.expect("Failed to write to GCS");
} else {
self.protocol.save_id_map(p).unwrap();
let num_split = ((self.protocol.get_id_map_size() as f32)
/ (self.s3_api_max_rows as f32))
.ceil() as usize;
self.protocol.save_id_map(p, Some(num_split)).unwrap();
}
}
None => self.protocol.print_id_map(),
Expand Down
8 changes: 8 additions & 0 deletions protocol-rpc/src/rpc/private-id-multi-key/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.long("run_id")
.default_value("")
.help("A run_id used to identify all the logs in a PL/PA run."),
Arg::with_name("s3api_max_rows")
.long("s3api_max_rows")
.takes_value(true)
.default_value("5000000")
.help("Number of rows per each output S3 file to split."),
])
.groups(&[
ArgGroup::with_name("tls")
Expand Down Expand Up @@ -129,6 +134,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let input_with_headers = matches.is_present("input-with-headers");
let output_path = matches.value_of("output");
let metric_path = matches.value_of("metric-path");
let s3api_max_rows_str = matches.value_of("s3api_max_rows").unwrap_or("5000000");
let s3_api_max_rows: usize = s3api_max_rows_str.to_string().parse().unwrap();

let no_tls = matches.is_present("no-tls");
let host = matches.value_of("host");
Expand Down Expand Up @@ -167,6 +174,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
output_path,
input_with_headers,
metrics_output_path,
s3_api_max_rows,
);

let ks = service.killswitch.clone();
Expand Down
14 changes: 10 additions & 4 deletions protocol/src/private_id_multi_key/company.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;

use common::files;
use common::permutations::gen_permute_pattern;
use common::permutations::permute;
use common::permutations::undo_permute;
Expand Down Expand Up @@ -481,23 +480,30 @@ impl CompanyPrivateIdMultiKeyProtocol for CompanyPrivateIdMultiKey {
fn print_id_map(&self) {
match (self.plaintext.clone().read(), self.id_map.clone().read()) {
(Ok(data), Ok(id_map)) => {
writer_helper(&data, &id_map, None);
writer_helper(&data, &id_map, None, None);
}
_ => panic!("Cannot print id_map"),
}
}

fn save_id_map(&self, path: &str) -> Result<(), ProtocolError> {
fn save_id_map(&self, path: &str, num_split: Option<usize>) -> Result<(), ProtocolError> {
match (self.plaintext.clone().read(), self.id_map.clone().read()) {
(Ok(data), Ok(id_map)) => {
writer_helper(&data, &id_map, Some(path.to_string()));
writer_helper(&data, &id_map, Some(path.to_string()), num_split);
Ok(())
}
_ => Err(ProtocolError::ErrorIO(
"Unable to write partner view to file".to_string(),
)),
}
}

fn get_id_map_size(&self) -> usize {
match self.id_map.clone().read() {
Ok(id_map) => id_map.len(),
_ => panic!("Cannot get id_map size"),
}
}
}

#[cfg(test)]
Expand Down
59 changes: 39 additions & 20 deletions protocol/src/private_id_multi_key/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,34 +62,53 @@ fn load_data(plaintext: Arc<RwLock<Vec<Vec<String>>>>, path: &str, input_with_he
t.qps("text read", text_len);
}

fn writer_helper(data: &[Vec<String>], id_map: &[(String, usize, bool)], path: Option<String>) {
let mut device = match path {
Some(path) => {
let wr = csv::WriterBuilder::new()
.flexible(true)
.buffer_capacity(1024)
.from_path(path)
.unwrap();
Some(wr)
}
None => None,
};
fn writer_helper(
data: &[Vec<String>],
id_map: &[(String, usize, bool)],
path: Option<String>,
num_split: Option<usize>,
) {
let mut device_list = Vec::new();
let mut chunk_size = id_map.len();
match path {
Some(path) => match num_split {
Some(num_split) => {
for n in 0..num_split {
let chunk_path = format!("{}_{}", path, n);
let wr = csv::WriterBuilder::new()
.flexible(true)
.buffer_capacity(1024)
.from_path(chunk_path)
.unwrap();
device_list.push(wr);
chunk_size = ((id_map.len() as f32) / (num_split as f32)).ceil() as usize;
}
}
None => {
let wr = csv::WriterBuilder::new()
.flexible(true)
.buffer_capacity(1024)
.from_path(path)
.unwrap();
device_list.push(wr);
}
},
None => (),
}

for (key, idx, flag) in id_map.iter() {
for (pos, (key, idx, flag)) in id_map.iter().enumerate() {
let mut v = vec![(*key).clone()];

match flag {
true => v.extend(data[*idx].clone()),
false => v.push("NA".to_string()),
}

match device {
Some(ref mut wr) => {
wr.write_record(v.as_slice()).unwrap();
}
None => {
println!("{}", v.join(","));
}
if device_list.is_empty() {
println!("{}", v.join(","));
} else {
let device = &mut device_list[pos / chunk_size];
device.write_record(v.as_slice()).unwrap();
}
}
}
Expand Down
Loading

0 comments on commit 4f28018

Please sign in to comment.