Skip to content

Commit

Permalink
ODB-538: First pass at exposing SQL functionality in odc
Browse files Browse the repository at this point in the history
  • Loading branch information
simondsmart committed Nov 9, 2022
1 parent 53aa9b7 commit ecd3690
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 14 deletions.
41 changes: 32 additions & 9 deletions src/odc/api/Odb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,6 @@ void ReaderImpl::restart() {

Frame ReaderImpl::next() {

std::vector<core::Table> tables;

if (it_ == tablesReader_.end()) return Frame();

if (!first_) {
Expand All @@ -146,7 +144,7 @@ Frame ReaderImpl::next() {
}

first_ = false;
tables.emplace_back(*it_);
std::vector<core::Table> tables{*it_};
long nrows = tables.back().rowCount();

if (aggregated_) {
Expand Down Expand Up @@ -202,14 +200,39 @@ const std::map<std::string, std::string>& FrameImpl::properties() const {

// API Forwarding

Reader::Reader(const std::string& path, bool aggregated, long rowlimit) :
impl_(new ReaderImpl(path, aggregated, rowlimit)) {}
/// @note we can do a more memory-efficient approach to SQL, by deriving a second type of ReaderImpl, which only
/// iterates through the filtered data handle as needed, and then does the aggregagation logic on the components

Reader::Reader(const std::string& path, bool aggregated, long rowlimit, const char* sql) {
if (sql && ::strlen(sql) > 0) {
std::unique_ptr<DataHandle> dh(eckit::PathName(path).fileHandle());
std::unique_ptr<MemoryHandle> filtered_dh(new MemoryHandle);
::odc::api::filter(sql, *dh, *filtered_dh);
impl_.reset(new ReaderImpl(filtered_dh.release(), aggregated, rowlimit));
} else {
impl_.reset(new ReaderImpl(path, aggregated, rowlimit));
}
}

Reader::Reader(eckit::DataHandle& dh, bool aggregated, long rowlimit) :
impl_(new ReaderImpl(dh, aggregated, rowlimit)) {}
Reader::Reader(eckit::DataHandle& dh, bool aggregated, long rowlimit, const char* sql) {
if (sql && ::strlen(sql) > 0) {
std::unique_ptr<MemoryHandle> filtered_dh(new MemoryHandle);
::odc::api::filter(sql, dh, *filtered_dh);
impl_.reset(new ReaderImpl(filtered_dh.release(), aggregated, rowlimit));
} else {
impl_.reset(new ReaderImpl(dh, aggregated, rowlimit));
}
}

Reader::Reader(eckit::DataHandle* dh, bool aggregated, long rowlimit) :
impl_(new ReaderImpl(dh, aggregated, rowlimit)) {}
Reader::Reader(eckit::DataHandle* dh, bool aggregated, long rowlimit, const char* sql) {
if (sql && ::strlen(sql) > 0) {
std::unique_ptr<MemoryHandle> filtered_dh(new MemoryHandle);
::odc::api::filter(sql, *dh, *filtered_dh);
delete dh;
dh = filtered_dh.release();
}
impl_.reset(new ReaderImpl(dh, aggregated, rowlimit));
}

Reader::~Reader() {}

Expand Down
9 changes: 6 additions & 3 deletions src/odc/api/Odb.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,21 +277,24 @@ class Reader {
* \param path File path to open
* \param aggregated Whether to aggregate compatible data into a logical frame
* \param rowlimit Maximum number of rows to aggregate into one logical frame
* \param sqlfilter An SQL query to pre-filter the data by. Note this will significantly increase memory usage
*/
Reader(const std::string& path, bool aggregated=true, long rowlimit=-1);
Reader(const std::string& path, bool aggregated=true, long rowlimit=-1, const char* sqlfilter=nullptr);
/** Construct from data handle reference. This does not take ownership of the data handle,
* and managing the lifetime of this data handle is the responsibility of the caller.
* \param dh Data handle (eckit)
* \param aggregated Whether to aggregate compatible data into a logical frame
* \param rowlimit Maximum number of rows to aggregate into one logical frame
* \param sqlfilter An SQL query to pre-filter the data by. Note this will significantly increase memory usage
*/
Reader(eckit::DataHandle& dh, bool aggregated=true, long rowlimit=-1);
Reader(eckit::DataHandle& dh, bool aggregated=true, long rowlimit=-1, const char* sqlfilter=nullptr);
/** Construct via data handle pointer. This takes ownership of the DataHandle.
* \param dh Data handle (eckit)
* \param aggregated Whether to aggregate compatible data into a logical frame
* \param rowlimit Maximum number of rows to aggregate into one logical frame
* \param sqlfilter An SQL query to pre-filter the data by. Note this will significantly increase memory usage
*/
Reader(eckit::DataHandle* dh, bool aggregated=true, long rowlimit=-1); // takes ownership
Reader(eckit::DataHandle* dh, bool aggregated=true, long rowlimit=-1, const char* sqlfilter=nullptr); // takes ownership
~Reader();

/** Advances to the next frame in the stream
Expand Down
18 changes: 16 additions & 2 deletions src/odc/api/odc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ struct odc_reader_t {
~odc_reader_t() noexcept(false) {
dh_->close();
}
std::string sql_;
std::unique_ptr<Reader> impl_;
std::unique_ptr<DataHandle> dh_;
};
Expand Down Expand Up @@ -324,6 +325,18 @@ int odc_open_stream(odc_reader_t** reader, void* handle, odc_stream_read_t strea
});
}

int odc_apply_filter(odc_reader_t* reader, const char* sql) {

return wrapApiFunction([reader, sql] {
ASSERT(!reader->impl_);
if (sql) {
reader->sql_ = sql;
} else {
reader->sql_.clear();
}
});
}

int odc_close(const odc_reader_t* reader) {
return wrapApiFunction([reader]{
ASSERT(reader);
Expand Down Expand Up @@ -358,7 +371,7 @@ int odc_next_frame(odc_frame_t* frame) {
odc_reader_t& r(frame->reader_);
if (!r.impl_) {
bool aggregated = false;
r.impl_.reset(new Reader(*r.dh_, aggregated));
r.impl_.reset(new Reader(*r.dh_, aggregated, -1, r.sql_.empty() ? nullptr : r.sql_.c_str()));
}

if ((frame->frame_ = r.impl_->next())) {
Expand All @@ -379,7 +392,8 @@ int odc_next_frame_aggregated(odc_frame_t* frame, long maximum_rows) {
odc_reader_t& r(frame->reader_);
if (!r.impl_) {
bool aggregated = true;
r.impl_.reset(new Reader(*r.dh_, aggregated, maximum_rows));
r.impl_.reset(new Reader(*r.dh_, aggregated, maximum_rows,
r.sql_.empty() ? nullptr : r.sql_.c_str()));
}

if ((frame->frame_ = r.impl_->next())) {
Expand Down
9 changes: 9 additions & 0 deletions src/odc/api/odc.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ typedef long (*odc_stream_read_t)(void* context, void* buffer, long length);
*/
int odc_open_stream(odc_reader_t** reader, void* context, odc_stream_read_t stream_proc);

/** Specify an SQL filter to apply to the specified data stream prior to decoding
* \note This comes with memory and performance overheads, as general SQL statements can require processing of the
* entire ODB-2 stream prior to outputting aggregated values.
* \param reader Reader instance to apply filter to
* \param sql SQL filter (SELECT statement) to apply to the specified data stream
* \returns Return code (#OdcErrorValues)
*/
int odc_apply_filter(odc_reader_t* reader, const char* sql);

/** Closes opened resource and destroys the reader
* \param reader Reader instance
* \returns Return code (#OdcErrorValues)
Expand Down

0 comments on commit ecd3690

Please sign in to comment.