Skip to content

Commit

Permalink
feat: add partition option
Browse files Browse the repository at this point in the history
  • Loading branch information
hmatalonga committed Sep 11, 2019
1 parent 7fa6ca4 commit e0cadc8
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 52 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# tags from Docker Hub.
FROM python:3.7-slim

LABEL Name=dataset-converter Version=0.1.1
LABEL Name=dataset-converter Version=0.2.0
LABEL maintainer="Hugo Matalonga <[email protected]>"

ARG UID=1000
Expand All @@ -32,4 +32,4 @@ RUN chown -R user:user /home/user
USER user
ENV PATH=${PATH}:/home/user/.local/bin

ENTRYPOINT ["/usr/local/bin/entrypoint.sh"]
CMD ["/usr/local/bin/entrypoint.sh"]
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ Default: `true`

Whether to compress the parquet files to `7z` format or not, when compression is on, it deletes the uncompressed parquet files afterwards. This argument is optional.

#### partition

Type: `boolean`

Default: `false`

Whether to export to separate files in chunks or to export to a single parquet file. This argument is optional.

#### chunksize

Type: `number`
Expand Down
123 changes: 78 additions & 45 deletions app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@


data_path = os.path.abspath('./data')
config_path = os.path.abspath('./config')
output_path = os.path.abspath('./output')


def downcast_df(df):
Expand Down Expand Up @@ -69,20 +71,34 @@ def load_tasks(df, plugins, category):
return df


def export_files(df, name, compression):
filename = name.split('.')[0]
def export_files(df, options, page=None):
filename = options['name'].split('.')[0]

if options['partition']:
filename += '.' + str(page)

filepath = os.path.join(data_path, filename + '.dtypes.pickle')
filepath = os.path.join(output_path, filename + '.dtypes.pickle')
print('Creating dtypes file -> {}'.format(filepath))
save_dtypes(cache_dtypes(df), filepath)

filepath = os.path.join(data_path, filename + '.parquet')
filepath = os.path.join(output_path, filename + '.parquet')
print('Creating parquet file -> {}'.format(filepath))
save_df(df, filepath)

if compression:
print('Compressing parquet file -> {}'.format(filepath + '.7z'))
compress_df(filepath)

def compress_files(name):
filename = name.split('.')[0]
files = os.path.join(output_path, filename + '*.parquet')
filepath = os.path.join(output_path, filename + '.parquet')

print('Compressing parquet files -> {}'.format(filepath + '.7z'))
compress_df(filepath + '.7z', files)

files = os.path.join(output_path, filename + '*.dtypes.pickle')
filepath = os.path.join(output_path, filename + '.dtypes.pickle')

print('Compressing parquet files -> {}'.format(filepath + '.7z'))
compress_df(filepath + '.7z', files)


def process_df(df, plugins, verbose=True):
Expand All @@ -102,29 +118,29 @@ def process_df(df, plugins, verbose=True):
return df


def load_single(name, sep, usecols, parse_dates, plugins):
filepath = os.path.join(data_path, name)
def load_single(options):
filepath = os.path.join(data_path, options['name'])

if not os.path.exists(filepath):
raise IOError(ENOENT, 'File not found', filepath)

print('Loading data file -> {}'.format(filepath))

df = pd.read_csv(filepath, sep=sep, usecols=usecols,
parse_dates=parse_dates, quoting=3)
df = pd.read_csv(filepath, sep=options['sep'], usecols=options['usecols'],
parse_dates=options['parse_dates'], quoting=3)

return process_df(df, plugins)
return process_df(df, options['plugins'])


def load_multiple(name, sep, usecols, parse_dates, chunksize, plugins):
def load_multiple(options):
chunk_list = [] # append each chunk df here
filepath = os.path.join(data_path, name)
filepath = os.path.join(data_path, options['name'])

if not os.path.exists(filepath):
raise IOError(ENOENT, 'File not found', filepath)

df_chunk = pd.read_csv(filepath, sep=sep, usecols=usecols,
parse_dates=parse_dates, chunksize=chunksize, quoting=3)
df_chunk = pd.read_csv(filepath, sep=options['sep'], usecols=options['usecols'],
parse_dates=options['parse_dates'], chunksize=options['chunksize'], quoting=3)
step = 1
hr = '==========================================='

Expand All @@ -135,58 +151,75 @@ def load_multiple(name, sep, usecols, parse_dates, chunksize, plugins):
print('Performing chunk step[{}] {}'.format(step, hr))

# perform data filtering
chunk_filter = process_df(chunk, plugins)
chunk_filtered = process_df(chunk, options['plugins'])

# Once the data filtering is done, append the chunk to list
chunk_list.append(chunk_filter)
if not options['partition']:
# Once the data filtering is done, append the chunk to list
chunk_list.append(chunk_filtered)
else:
print('Exporting chunk to file')
chunk_filtered = load_tasks(chunk_filtered, options['plugins'], 'after')
export_files(chunk_filtered, options, step)

step += 1

print('Merging all processed chunks')
# concat the list into dataframe
df_concat = pd.concat(chunk_list)

return df_concat
if not options['partition']:
print('Merging all processed chunks')
# concat the list into dataframe
return pd.concat(chunk_list)


def convert_df(params):
df = None
sep = ';'
compression = True
usecols = None
plugins = None
chunksize = None
parse_dates = None

print('Parsed arguments: {}'.format(params))

options = {
'name': params['name'],
'sep': ';',
'chunksize': None,
'compression': True,
'partition': False,
'usecols': None,
'parse_dates': None,
'plugins': None
}

df = None

if 'sep' in params:
sep = params['sep']
options['sep'] = params['sep']
if 'compression' in params:
compression = params['compression']
options['compression'] = params['compression']
if 'partition' in params:
options['partition'] = params['partition']
if 'usecols' in params:
usecols = params['usecols']
options['usecols'] = params['usecols']
if 'chunksize' in params:
chunksize = params['chunksize']
options['chunksize'] = params['chunksize']
if 'parse_dates' in params:
parse_dates = params['parse_dates']
options['parse_dates'] = params['parse_dates']
if 'plugins' in params:
plugins = params['plugins']
options['plugins'] = params['plugins']

is_partitioned = (options['chunksize'] is not None) and options['partition']

if chunksize is None:
df = load_single(params['name'], sep, usecols, parse_dates, plugins)
if is_partitioned:
load_multiple(options)
else:
df = load_multiple(params['name'], sep, usecols, parse_dates, chunksize, plugins)
if options['chunksize'] is None:
df = load_single(options)
else:
df = load_multiple(options)

# Call after plugins
df = load_tasks(df, plugins, 'after')
df = load_tasks(df, options['plugins'], 'after')
export_files(df, options)

export_files(df, params['name'], compression)
if options['compression']:
compress_files(options['name'])


def main():
try:
files = [f for f in glob.glob(data_path + "**/*.yml", recursive=True)]
files = [f for f in glob.glob(config_path + "**/*.yml", recursive=True)]

for config in files:
with open(config) as f:
Expand Down
4 changes: 2 additions & 2 deletions app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ def save_df(df, path, compression='snappy', use_dictionary=True):
print(e)


def compress_df(filepath):
p = Popen(['7z', 'a', '-t7z', '-sdel', '--', filepath + '.7z', filepath],
def compress_df(filepath, files):
p = Popen(['7z', 'a', '-t7z', '-sdel', '--', filepath, files],
stdout=subprocess.PIPE, universal_newlines=True)
p.wait()
output,_ = p.communicate()
Expand Down
1 change: 1 addition & 0 deletions data/samples.yml.example → config/samples.yml.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name: "samples.csv"
chunksize: 1000000
compression: true
partition: false
usecols:
- device_id
- timestamp
Expand Down
11 changes: 9 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ services:
args:
- UID=${UID}
- GID=${GID}
environment:
- DATA_DIR=${DATA_DIR}
- CONFIG_DIR=${CONFIG_DIR}
- PLUGIN_DIR=${PLUGIN_DIR}
- OUTPUT_DIR=${OUTPUT_DIR}
volumes:
- ./data:/home/user/data
- ./plugins:/home/user/plugins
- ${DATA_DIR}:/home/user/data
- ${CONFIG_DIR}:/home/user/config
- ${PLUGIN_DIR}:/home/user/plugins
- ${OUTPUT_DIR}:/home/user/output
2 changes: 1 addition & 1 deletion test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

docker-compose down
docker rmi dataset-converter:latest
docker-compose up
docker-compose up -d

0 comments on commit e0cadc8

Please sign in to comment.