Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #3219: Optimize old Parquet srting read code #3220

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
8 changes: 5 additions & 3 deletions benchmarks/IO.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, fileFormat,
else:
raise ValueError("Invalid file format")

nb = a.size * a.itemsize * numfiles
itemsize = 4 if dtype == 'str' else a.itemsize
nb = a.size * itemsize * numfiles
for key in times.keys():
print("write Average time {} = {:.4f} sec".format(key, times[key]))
print("write Average rate {} = {:.4f} GiB/sec".format(key, nb / 2**30 / times[key]))
Expand Down Expand Up @@ -139,8 +140,9 @@ def time_ak_read(N_per_locale, numfiles, trials, dtype, path, fileFormat, comps=
times["CSV"] = sum(readtimes) / trials
else:
raise ValueError("Invalid file format")

nb = a.size * a.itemsize

itemsize = 4 if dtype == 'str' else a.itemsize
nb = a.size * itemsize
for key in times.keys():
print("read Average time {} = {:.4f} sec".format(key, times[key]))
print("read Average rate {} = {:.4f} GiB/sec".format(key, nb / 2**30 / times[key]))
Expand Down
34 changes: 24 additions & 10 deletions src/ArrowFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -827,23 +827,37 @@ int cpp_readColumnByName(const char* filename, void* chpl_arr, bool* where_null_
}
}
} else if(ty == ARROWSTRING) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (unsigned char*)chpl_arr;
parquet::ByteArrayReader* reader =
static_cast<parquet::ByteArrayReader*>(column_reader.get());

while (reader->HasNext()) {
parquet::ByteArray value;
(void)reader->ReadBatch(1, &definition_level, nullptr, &value, &values_read);
// if values_read is 0, that means that it was a null value
if(values_read > 0) {
for(int j = 0; j < value.len; j++) {
chpl_ptr[i] = value.ptr[j];
int totalProcessed = 0;
std::vector<parquet::ByteArray> values(batchSize);
while (reader->HasNext() && totalProcessed < numElems) {
std::vector<int16_t> definition_levels(batchSize,-1);
if((numElems - totalProcessed) < batchSize) // adjust batchSize if needed
batchSize = numElems - totalProcessed;

(void)reader->ReadBatch(batchSize, definition_levels.data(), nullptr, values.data(), &values_read);
totalProcessed += values_read;
int j = 0;
int numProcessed = 0;
while(j < batchSize) {
if(definition_levels[j] == 1) {
for(int k = 0; k < values[numProcessed].len; k++) {
chpl_ptr[i] = values[numProcessed].ptr[k];
i++;
}
i++; // skip one space so the strings are null terminated with a 0
numProcessed++;
} else if(definition_levels[j] == 0) {
i++;
} else {
j = batchSize; // exit loop, not read
}
j++;
}
i++; // skip one space so the strings are null terminated with a 0
}
}
} else if(ty == ARROWFLOAT) {
int16_t definition_level; // nullable type and only reading single records in batch
auto chpl_ptr = (double*)chpl_arr;
Expand Down
53 changes: 4 additions & 49 deletions src/ParquetMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -979,58 +979,13 @@ module ParquetMsg {
}
rnames.pushBack((dsetname, ObjType.PDARRAY, valName));
} else if ty == ArrowTypes.stringArr {
/*
1. create a block distributed files array (locale owner reads file)
2. get number of row groups so we know how much data we have to store
3. create array to store data (2D array with same distribution dist files)
4. go distributed and create readers for each file
*/
extern proc c_getNumRowGroups(readerIdx): c_int;
extern proc c_openFile(filename, idx);
extern proc c_createRowGroupReader(rowGroup, readerIdx);
extern proc c_createColumnReader(colname, readerIdx);
extern proc c_freeMapValues(rowToFree);
extern proc c_readParquetColumnChunks(filename, batchSize,
numElems, readerIdx, numRead,
externalData, defLevels, errMsg): int;

var entrySeg = createSymEntry(len, int);

var distFiles = makeDistArray(filenames);
var numRowGroups: [distFiles.domain] int;

var maxRowGroups = getRowGroupNums(distFiles, numRowGroups);
var externalData: [distFiles.domain] [0..#maxRowGroups] c_ptr(void);
var containsNulls: [distFiles.domain] [0..#maxRowGroups] bool;
var valsRead: [distFiles.domain] [0..#maxRowGroups] int;
var bytesPerRG: [distFiles.domain] [0..#maxRowGroups] int;
var startIdxs: [distFiles.domain] [0..#maxRowGroups] int; // correspond to starting idx in entrySeg

fillSegmentsAndPersistData(distFiles, entrySeg, externalData, containsNulls, valsRead, dsetname, sizes, len, numRowGroups, bytesPerRG, startIdxs);

var (rgSubdomains, totalBytes) = getRGSubdomains(bytesPerRG, maxRowGroups);

var entryVal;
byteSizes = calcStrSizesAndOffset(entrySeg.a, filenames, sizes, dsetname);
entrySeg.a = (+ scan entrySeg.a) - entrySeg.a;

if containsNulls[0][0] {
byteSizes = calcStrSizesAndOffset(entrySeg.a, filenames, sizes, dsetname);
entrySeg.a = (+ scan entrySeg.a) - entrySeg.a;
entryVal = createSymEntry((+ reduce byteSizes), uint(8));
readStrFilesByName(entryVal.a, whereNull, filenames, byteSizes, dsetname, ty);
} else {
entryVal = createSymEntry(totalBytes, uint(8));
entrySeg.a = (+ scan entrySeg.a) - entrySeg.a;
copyValuesFromC(entryVal, distFiles, externalData, valsRead, numRowGroups, rgSubdomains, maxRowGroups, sizes, entrySeg.a, startIdxs);
}
var entryVal = createSymEntry((+ reduce byteSizes), uint(8));
readStrFilesByName(entryVal.a, whereNull, filenames, byteSizes, dsetname, ty);

for i in externalData.domain {
for j in externalData[i].domain {
if valsRead[i][j] > 0 then
on externalData[i][j] do
c_freeMapValues(externalData[i][j]);
}
}

var stringsEntry = assembleSegStringFromParts(entrySeg, entryVal, st);
rnames.pushBack((dsetname, ObjType.STRINGS, "%s+%?".doFormat(stringsEntry.name, stringsEntry.nBytes)));
} else if ty == ArrowTypes.double || ty == ArrowTypes.float {
Expand Down
Loading