Skip to content

Commit

Permalink
simplified to download the whole file
Browse files Browse the repository at this point in the history
  • Loading branch information
darthbear committed May 5, 2019
1 parent 2616af9 commit 433abb2
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 51 deletions.
34 changes: 1 addition & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,36 +181,4 @@ psql> select * from animals;
parrot | 103
tortoise | 205
(4 rows)
```

### Configuration

One can tune some parameters such as:
* `aws_s3.buffer_chunk_size` (chunk size when reading data from S3). Default is 262,144 bytes.
* `aws_s3.buffer_lines` (number of lines to be imported in the table at each time). Default is 1,000 lines.

One can update these parameters at the session level or at database level.

At the session level:
```postgresql
SET aws_s3.buffer_chunk_size = 1000000;
SET aws_s3.buffer_num_lines = 1000;
```

At the database level:
```postgresql
ALTER DATABASE db SET aws_s3.buffer_chunk_size = 1000000;
ALTER DATABASE db SET aws_s3.buffer_num_lines = 1000;
-- Then reopen postgres
```

Then to check the values of the settings:
```postgresql
SELECT current_setting('aws_s3.buffer_chunk_size', true) AS chunk_size, current_setting('aws_s3.buffer_num_lines', true) as num_lines;
chunk_size | num_lines
------------+-----------
1000000 | 1000
(1 row)
```
```
24 changes: 6 additions & 18 deletions aws_s3--0.0.1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ AS $$
boto3 = cache_import('boto3')
tempfile = cache_import('tempfile')

plan = plpy.prepare('select current_setting($1, true)', ['TEXT'])
buffer_chunk_size = plan.execute(['aws_s3.buffer_chunk_size'])[0]['current_setting'] or 131072
buffer_num_lines = plan.execute(['aws_s3.buffer_num_lines'])[0]['current_setting'] or 10000
plan = plpy.prepare('select current_setting($1, true)::int', ['TEXT'])

s3 = boto3.client(
's3',
Expand All @@ -78,27 +76,17 @@ AS $$
region_name=region
)

def copy_rows(fd):
with tempfile.NamedTemporaryFile() as fd:
s3.download_fileobj(bucket, file_path, fd)
fd.flush()
res = plpy.execute("COPY {table_name} {column_list} FROM '{filename}' {options};".format(
res = plpy.execute("COPY {table_name} {column_list} FROM {filename} {options};".format(
table_name=table_name,
filename=plpy.quote_literal(fd.name),
column_list=column_list,
filename=fd.name,
options=options
)
)
fd.seek(0)

lines = s3.get_object(Bucket=bucket, Key=file_path)['Body'].iter_lines(chunk_size=buffer_chunk_size)
with tempfile.NamedTemporaryFile() as fd:
for line_num, line in enumerate(lines):
if (line_num + 1) % buffer_num_lines == 0:
copy_rows(fd)

fd.write(line + '\n')

copy_rows(fd)
return line_num
return res.nrows()
$$;

--
Expand Down

0 comments on commit 433abb2

Please sign in to comment.