Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30787 #19168

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 130 additions & 10 deletions common/thorhelper/thorread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2027,20 +2027,85 @@ void RemoteDiskRowReader::stop()
{
}

///---------------------------------------------------------------------------------------------------------------------

class DiskRowWriter : public CInterfaceOf<ILogicalRowSink>, implements IDiskRowWriter
{
public:
DiskRowWriter(IDiskReadMapping * _mapping) : mapping(_mapping) {}
IMPLEMENT_IINTERFACE_USING(CInterfaceOf<ILogicalRowSink>);

ILogicalRowSink * queryRowSink() override;

protected:
Linked<IDiskReadMapping> mapping;
};

ILogicalRowSink * DiskRowWriter::queryRowSink()
{
return this;
}

class BinaryDiskRowWriter : public DiskRowWriter
{
public:
BinaryDiskRowWriter(IDiskReadMapping * _mapping) : DiskRowWriter(_mapping) {}

bool matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) override;

bool setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions) override;

void putRow(const void *row) override; // takes ownership of row. rename to putOwnedRow?
void flush() override;
void noteStopped() override;

void writeRow(const void *row) override;
};


bool BinaryDiskRowWriter::matches(const char * format, bool streamRemote, IDiskReadMapping * mapping)
{
return strieq(format, "flat");
}

bool BinaryDiskRowWriter::setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions)
{
return false;
}

void BinaryDiskRowWriter::putRow(const void *row)
{
throwUnexpected();
}

void BinaryDiskRowWriter::flush()
{
throwUnexpected();
}

void BinaryDiskRowWriter::noteStopped()
{
throwUnexpected();
}

void BinaryDiskRowWriter::writeRow(const void *row)
{
throwUnexpected();
}

///---------------------------------------------------------------------------------------------------------------------

// Lookup to map the names of file types/formats to their object constructors;
// map will be initialized within MODULE_INIT
static std::map<std::string, std::function<DiskRowReader*(IDiskReadMapping*)>> genericFileTypeMap;

static std::map<std::string, std::function<DiskRowReader*(IDiskReadMapping*)>> genericFileTypeReadersMap;
static std::map<std::string, std::function<DiskRowWriter*(IDiskReadMapping*)>> genericFileTypeWritersMap;

// format is assumed to be lowercase
IDiskRowReader * doCreateLocalDiskReader(const char * format, IDiskReadMapping * _mapping)
{
auto foundReader = genericFileTypeMap.find(format);
auto foundReader = genericFileTypeReadersMap.find(format);

if (foundReader != genericFileTypeMap.end() && foundReader->second)
if (foundReader != genericFileTypeReadersMap.end() && foundReader->second)
return foundReader->second(_mapping);

UNIMPLEMENTED;
Expand Down Expand Up @@ -2085,19 +2150,74 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
// should be defined here; the key is the lowecase name of the format,
// as will be used in ECL, and the value should be a lambda
// that creates the appropriate disk row reader object
genericFileTypeMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowReader(_mapping); });
genericFileTypeMap.emplace("csv", [](IDiskReadMapping * _mapping) { return new CsvDiskRowReader(_mapping); });
genericFileTypeMap.emplace("xml", [](IDiskReadMapping * _mapping) { return new XmlDiskRowReader(_mapping); });
genericFileTypeReadersMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowReader(_mapping); });
genericFileTypeReadersMap.emplace("csv", [](IDiskReadMapping * _mapping) { return new CsvDiskRowReader(_mapping); });
genericFileTypeReadersMap.emplace("xml", [](IDiskReadMapping * _mapping) { return new XmlDiskRowReader(_mapping); });
#ifdef _USE_PARQUET
genericFileTypeMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return new ParquetDiskRowReader(_mapping); });
genericFileTypeReadersMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return new ParquetDiskRowReader(_mapping); });
#else
genericFileTypeMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return nullptr; });
genericFileTypeReadersMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return nullptr; });
#endif

// Stuff the file type names that were just instantiated into a list;
// list will be accessed by the ECL compiler to validate the names
// at compile time
for (auto iter = genericFileTypeMap.begin(); iter != genericFileTypeMap.end(); iter++)
for (auto iter = genericFileTypeReadersMap.begin(); iter != genericFileTypeReadersMap.end(); iter++)
addAvailableGenericFileTypeName(iter->first.c_str());

return true;
}

// format is assumed to be lowercase
IDiskRowWriter * doCreateLocalDiskWriter(const char * format, IDiskReadMapping * _mapping)
{
auto foundReader = genericFileTypeWritersMap.find(format);

if (foundReader != genericFileTypeWritersMap.end() && foundReader->second)
return foundReader->second(_mapping);

UNIMPLEMENTED;
}

IDiskRowWriter * createLocalDiskWriter(const char * format, IDiskReadMapping * mapping)
{
Owned<IDiskRowWriter> directReader = doCreateLocalDiskWriter(format, mapping);
if (mapping->expectedMatchesProjected() || strieq(format, "flat"))
return directReader.getClear();

// Owned<IDiskReadMapping> expectedMapping = createUnprojectedMapping(mapping);
// Owned<IDiskRowWriter> expectedReader = doCreateLocalDiskWriter(format, expectedMapping);
// return new AlternativeDiskRowWriter(directReader, expectedReader, mapping);
return nullptr;
}

// IDiskRowWriter * createRemoteDiskWriter(const char * format, IDiskReadMapping * _mapping)
// {
// return new RemoteDiskRowWriter(format, _mapping);
// }

IDiskRowWriter * createDiskWriter(const char * format, bool streamRemote, IDiskReadMapping * _mapping)
{
// if (streamRemote)
// return createRemoteDiskWriter(format, _mapping);
// else
// return createLocalDiskWriter(format, _mapping);

return createLocalDiskWriter(format, _mapping);
}

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
// All pluggable file types that use the generic disk reader
// should be defined here; the key is the lowecase name of the format,
// as will be used in ECL, and the value should be a lambda
// that creates the appropriate disk row reader object
genericFileTypeWritersMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowWriter(_mapping); });

// Stuff the file type names that were just instantiated into a list;
// list will be accessed by the ECL compiler to validate the names
// at compile time
for (auto iter = genericFileTypeWritersMap.begin(); iter != genericFileTypeWritersMap.end(); iter++)
addAvailableGenericFileTypeName(iter->first.c_str());

return true;
Expand Down
30 changes: 23 additions & 7 deletions common/thorhelper/thorread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,16 @@ THORHELPER_API IDiskReadMapping * createDiskReadMapping(RecordTranslationMode mo


typedef IConstArrayOf<IFieldFilter> FieldFilterArray;
interface IRowReader : extends IInterface
{
public:
// get the interface for reading streams of row. outputAllocator can be null if allocating next is not used.
virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) = 0;
};

interface ITranslator;
class CLogicalFileSlice;
interface IDiskRowReader : extends IRowReader

interface IDiskRowReader : extends IInterface
{
public:
// get the interface for reading streams of row. outputAllocator can be null if allocating next is not used.
virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) = 0;

virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) = 0;

//Specify where the raw binary input for a particular file is coming from, together with its actual format.
Expand All @@ -91,9 +89,27 @@ interface IDiskRowReader : extends IRowReader
virtual bool setInputFile(const CLogicalFileSlice & slice, const FieldFilterArray & expectedFilter, unsigned copy) = 0;
};

interface IDiskRowWriter : extends IInterface
{
public:
//Get the interface for writing streams of row.
virtual ILogicalRowSink * queryRowSink() = 0;


//MORE: Should be a disk write mapping
virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) = 0;

//Specify where the data is being written to. I'm not sure that output options are needed (can be passed in the constructor)
virtual bool setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions) = 0;
};


//Create a row reader for a thor binary file. The expected, projected, actual and options never change. The file providing the data can change.
extern THORHELPER_API IDiskRowReader * createLocalDiskReader(const char * format, IDiskReadMapping * mapping);
extern THORHELPER_API IDiskRowReader * createRemoteDiskReader(const char * format, IDiskReadMapping * mapping);
extern THORHELPER_API IDiskRowReader * createDiskReader(const char * format, bool streamRemote, IDiskReadMapping * mapping);

extern THORHELPER_API IDiskRowWriter * createLocalDiskWriter(const char * format, IDiskReadMapping * mapping);
// extern THORHELPER_API IDiskRowWriter * createRemoteDiskWriter(const char * format, IDiskReadMapping * mapping);
extern THORHELPER_API IDiskRowWriter * createDiskWriter(const char * format, bool streamRemote, IDiskReadMapping * mapping);
#endif
8 changes: 7 additions & 1 deletion ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
{
case TAKdiskwrite:
case TAKspillwrite:
return createDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph);
{
// bool isGeneric = (((IHThorNewDiskReadArg &)arg).getFlags() & TDXgeneric) != 0;
if (1)
return createGenericDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph, node);
else
return createDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph);
}
case TAKsort:
return createGroupSortActivity(agent, activityId, subgraphId, (IHThorSortArg &)arg, kind, graph);
case TAKdedup:
Expand Down
121 changes: 121 additions & 0 deletions ecl/hthor/hthor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12003,6 +12003,122 @@ const void *CHThorGenericDiskReadActivity::nextRow()
return NULL;
}

CHThorGenericDiskWriteBaseActivity::CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg), inputOptions(_node)
{

}

void CHThorGenericDiskWriteBaseActivity::ready()
{
CHThorActivityBase::ready();
}

void CHThorGenericDiskWriteBaseActivity::stop()
{
CHThorActivityBase::stop();
}

void CHThorGenericDiskWriteBaseActivity::execute()
{
CHThorActivityBase::execute();
}

IDiskRowWriter * CHThorGenericDiskWriteBaseActivity::ensureRowWriter(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, const IPropertyTree * options)
{
Owned<IDiskReadMapping> mapping = createDiskReadMapping(RecordTranslationMode::None, format, actualCrc, actual, expectedCrc, expected, projectedCrc, projected, options);

ForEachItemIn(i, writers)
{
IDiskRowWriter & cur = writers.item(i);
if (cur.matches(format, streamRemote, mapping))
return &cur;
}
IDiskRowWriter * writer = createDiskWriter(format, streamRemote, mapping);
writers.append(*writer);
return writer;
}

CHThorGenericDiskWriteActivity::CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorGenericDiskWriteBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg)
{

}

// Implement all pure virtual functions from CHThorGenericDiskWriteBaseActivity
void CHThorGenericDiskWriteActivity::ready()
{
PARENT::ready();
if (!activeWriter)
activeWriter = ensureRowWriter("flat", false, helper.getFormatCrc(), *helper.queryDiskRecordSize(), helper.getFormatCrc(), *helper.queryDiskRecordSize(), 0, *helper.queryDiskRecordSize(), inputOptions);
outSeq.setown(activeWriter->queryRowSink());
}

void CHThorGenericDiskWriteActivity::stop()
{
PARENT::stop();
}

void CHThorGenericDiskWriteActivity::execute()
{
// Loop thru the results
numRecords = 0;
while (next())
numRecords++;
finishOutput();
}

const void * CHThorGenericDiskWriteActivity::getNext()
{ // through operation (writes and returns row)
// needs a one row lookahead to preserve group
if (!nextrow.get())
{
nextrow.setown(input->nextRow());
if (!nextrow.get())
{
nextrow.setown(input->nextRow());
if (nextrow.get()&&grouped) // only write eog if not at eof
outSeq->putRow(NULL);
return NULL;
}
}
outSeq->putRow(nextrow.getLink());
checkSizeLimit();
return nextrow.getClear();
}

bool CHThorGenericDiskWriteActivity::finishOutput()
{
return true;
}

bool CHThorGenericDiskWriteActivity::next()
{
if (!nextrow.get())
{
OwnedConstRoxieRow row(input->nextRow());
if (!row.get())
{
row.setown(input->nextRow());
if (!row.get())
return false; // we are done
if (grouped)
outSeq->putRow(NULL);
}
outSeq->putRow(row.getClear());
}
else
outSeq->putRow(nextrow.getClear());
checkSizeLimit();
return true;
}

void CHThorGenericDiskWriteActivity::checkSizeLimit()
{
if (sizeLimit && (numRecords >= sizeLimit))
throw MakeStringException(0, "Size limit exceeded");
}

//=====================================================================================================

MAKEFACTORY(DiskWrite);
Expand Down Expand Up @@ -12070,6 +12186,11 @@ extern HTHOR_API IHThorActivity *createGenericDiskReadActivity(IAgentContext &_a
return new CHThorGenericDiskReadActivity(_agent, _activityId, _subgraphId, arg, kind, _graph, node);
}

extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node)
{
return new CHThorGenericDiskWriteActivity(_agent, _activityId, _subgraphId, arg, kind, _graph, node);
}

MAKEFACTORY(Null);
MAKEFACTORY(SideEffect);
MAKEFACTORY(Action);
Expand Down
1 change: 1 addition & 0 deletions ecl/hthor/hthor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ extern HTHOR_API IHThorActivity *createDiskGroupAggregateActivity(IAgentContext

extern HTHOR_API IHThorActivity *createNewDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *node);
extern HTHOR_API IHThorActivity *createGenericDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node);
extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node);

extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
Expand Down
Loading
Loading