diff --git a/common/thorhelper/thorread.cpp b/common/thorhelper/thorread.cpp index 2b48cab2fcf..34d101bf0b1 100644 --- a/common/thorhelper/thorread.cpp +++ b/common/thorhelper/thorread.cpp @@ -2027,20 +2027,85 @@ void RemoteDiskRowReader::stop() { } +///--------------------------------------------------------------------------------------------------------------------- + +class DiskRowWriter : public CInterfaceOf, implements IDiskRowWriter +{ +public: + DiskRowWriter(IDiskReadMapping * _mapping) : mapping(_mapping) {} + IMPLEMENT_IINTERFACE_USING(CInterfaceOf); + + ILogicalRowSink * queryRowSink() override; + +protected: + Linked mapping; +}; + +ILogicalRowSink * DiskRowWriter::queryRowSink() +{ + return this; +} + +class BinaryDiskRowWriter : public DiskRowWriter +{ +public: + BinaryDiskRowWriter(IDiskReadMapping * _mapping) : DiskRowWriter(_mapping) {} + + bool matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) override; + + bool setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions) override; + + void putRow(const void *row) override; // takes ownership of row. rename to putOwnedRow? + void flush() override; + void noteStopped() override; + + void writeRow(const void *row) override; +}; + + +bool BinaryDiskRowWriter::matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) +{ + return strieq(format, "flat"); +} + +bool BinaryDiskRowWriter::setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions) +{ + return false; +} + +void BinaryDiskRowWriter::putRow(const void *row) +{ + throwUnexpected(); +} + +void BinaryDiskRowWriter::flush() +{ + throwUnexpected(); +} + +void BinaryDiskRowWriter::noteStopped() +{ + throwUnexpected(); +} + +void BinaryDiskRowWriter::writeRow(const void *row) +{ + throwUnexpected(); +} ///--------------------------------------------------------------------------------------------------------------------- // Lookup to map the names of file types/formats to their object constructors; // map will be initialized within MODULE_INIT -static std::map> genericFileTypeMap; - +static std::map> genericFileTypeReadersMap; +static std::map> genericFileTypeWritersMap; // format is assumed to be lowercase IDiskRowReader * doCreateLocalDiskReader(const char * format, IDiskReadMapping * _mapping) { - auto foundReader = genericFileTypeMap.find(format); + auto foundReader = genericFileTypeReadersMap.find(format); - if (foundReader != genericFileTypeMap.end() && foundReader->second) + if (foundReader != genericFileTypeReadersMap.end() && foundReader->second) return foundReader->second(_mapping); UNIMPLEMENTED; @@ -2085,19 +2150,74 @@ MODULE_INIT(INIT_PRIORITY_STANDARD) // should be defined here; the key is the lowecase name of the format, // as will be used in ECL, and the value should be a lambda // that creates the appropriate disk row reader object - genericFileTypeMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowReader(_mapping); }); - genericFileTypeMap.emplace("csv", [](IDiskReadMapping * _mapping) { return new CsvDiskRowReader(_mapping); }); - genericFileTypeMap.emplace("xml", [](IDiskReadMapping * _mapping) { return new XmlDiskRowReader(_mapping); }); + genericFileTypeReadersMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowReader(_mapping); }); + genericFileTypeReadersMap.emplace("csv", [](IDiskReadMapping * _mapping) { return new CsvDiskRowReader(_mapping); }); + genericFileTypeReadersMap.emplace("xml", [](IDiskReadMapping * _mapping) { return new XmlDiskRowReader(_mapping); }); #ifdef _USE_PARQUET - genericFileTypeMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return new ParquetDiskRowReader(_mapping); }); + genericFileTypeReadersMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return new ParquetDiskRowReader(_mapping); }); #else - genericFileTypeMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return nullptr; }); + genericFileTypeReadersMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return nullptr; }); #endif // Stuff the file type names that were just instantiated into a list; // list will be accessed by the ECL compiler to validate the names // at compile time - for (auto iter = genericFileTypeMap.begin(); iter != genericFileTypeMap.end(); iter++) + for (auto iter = genericFileTypeReadersMap.begin(); iter != genericFileTypeReadersMap.end(); iter++) + addAvailableGenericFileTypeName(iter->first.c_str()); + + return true; +} + +// format is assumed to be lowercase +IDiskRowWriter * doCreateLocalDiskWriter(const char * format, IDiskReadMapping * _mapping) +{ + auto foundReader = genericFileTypeWritersMap.find(format); + + if (foundReader != genericFileTypeWritersMap.end() && foundReader->second) + return foundReader->second(_mapping); + + UNIMPLEMENTED; +} + +IDiskRowWriter * createLocalDiskWriter(const char * format, IDiskReadMapping * mapping) +{ + Owned directReader = doCreateLocalDiskWriter(format, mapping); + if (mapping->expectedMatchesProjected() || strieq(format, "flat")) + return directReader.getClear(); + + // Owned expectedMapping = createUnprojectedMapping(mapping); + // Owned expectedReader = doCreateLocalDiskWriter(format, expectedMapping); + // return new AlternativeDiskRowWriter(directReader, expectedReader, mapping); + return nullptr; +} + +// IDiskRowWriter * createRemoteDiskWriter(const char * format, IDiskReadMapping * _mapping) +// { +// return new RemoteDiskRowWriter(format, _mapping); +// } + +IDiskRowWriter * createDiskWriter(const char * format, bool streamRemote, IDiskReadMapping * _mapping) +{ + // if (streamRemote) + // return createRemoteDiskWriter(format, _mapping); + // else + // return createLocalDiskWriter(format, _mapping); + + return createLocalDiskWriter(format, _mapping); +} + +MODULE_INIT(INIT_PRIORITY_STANDARD) +{ + // All pluggable file types that use the generic disk reader + // should be defined here; the key is the lowecase name of the format, + // as will be used in ECL, and the value should be a lambda + // that creates the appropriate disk row reader object + genericFileTypeWritersMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowWriter(_mapping); }); + + // Stuff the file type names that were just instantiated into a list; + // list will be accessed by the ECL compiler to validate the names + // at compile time + for (auto iter = genericFileTypeWritersMap.begin(); iter != genericFileTypeWritersMap.end(); iter++) addAvailableGenericFileTypeName(iter->first.c_str()); return true; diff --git a/common/thorhelper/thorread.hpp b/common/thorhelper/thorread.hpp index 8179354c70b..604497add5d 100644 --- a/common/thorhelper/thorread.hpp +++ b/common/thorhelper/thorread.hpp @@ -68,18 +68,16 @@ THORHELPER_API IDiskReadMapping * createDiskReadMapping(RecordTranslationMode mo typedef IConstArrayOf FieldFilterArray; -interface IRowReader : extends IInterface -{ -public: - // get the interface for reading streams of row. outputAllocator can be null if allocating next is not used. - virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) = 0; -}; interface ITranslator; class CLogicalFileSlice; -interface IDiskRowReader : extends IRowReader + +interface IDiskRowReader : extends IInterface { public: + // get the interface for reading streams of row. outputAllocator can be null if allocating next is not used. + virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) = 0; + virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) = 0; //Specify where the raw binary input for a particular file is coming from, together with its actual format. @@ -91,9 +89,27 @@ interface IDiskRowReader : extends IRowReader virtual bool setInputFile(const CLogicalFileSlice & slice, const FieldFilterArray & expectedFilter, unsigned copy) = 0; }; +interface IDiskRowWriter : extends IInterface +{ +public: + //Get the interface for writing streams of row. + virtual ILogicalRowSink * queryRowSink() = 0; + + + //MORE: Should be a disk write mapping + virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) = 0; + + //Specify where the data is being written to. I'm not sure that output options are needed (can be passed in the constructor) + virtual bool setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions) = 0; +}; + + //Create a row reader for a thor binary file. The expected, projected, actual and options never change. The file providing the data can change. extern THORHELPER_API IDiskRowReader * createLocalDiskReader(const char * format, IDiskReadMapping * mapping); extern THORHELPER_API IDiskRowReader * createRemoteDiskReader(const char * format, IDiskReadMapping * mapping); extern THORHELPER_API IDiskRowReader * createDiskReader(const char * format, bool streamRemote, IDiskReadMapping * mapping); +extern THORHELPER_API IDiskRowWriter * createLocalDiskWriter(const char * format, IDiskReadMapping * mapping); +// extern THORHELPER_API IDiskRowWriter * createRemoteDiskWriter(const char * format, IDiskReadMapping * mapping); +extern THORHELPER_API IDiskRowWriter * createDiskWriter(const char * format, bool streamRemote, IDiskReadMapping * mapping); #endif diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index a14266c4c07..0686ccc2c87 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -53,7 +53,13 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI { case TAKdiskwrite: case TAKspillwrite: - return createDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph); + { + // bool isGeneric = (((IHThorNewDiskReadArg &)arg).getFlags() & TDXgeneric) != 0; + if (1) + return createGenericDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph, node); + else + return createDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph); + } case TAKsort: return createGroupSortActivity(agent, activityId, subgraphId, (IHThorSortArg &)arg, kind, graph); case TAKdedup: diff --git a/ecl/hthor/hthor.cpp b/ecl/hthor/hthor.cpp index 4ccb5c9860c..ce6ea32a343 100644 --- a/ecl/hthor/hthor.cpp +++ b/ecl/hthor/hthor.cpp @@ -12003,6 +12003,122 @@ const void *CHThorGenericDiskReadActivity::nextRow() return NULL; } +CHThorGenericDiskWriteBaseActivity::CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node) +: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg), inputOptions(_node) +{ + +} + +void CHThorGenericDiskWriteBaseActivity::ready() +{ + CHThorActivityBase::ready(); +} + +void CHThorGenericDiskWriteBaseActivity::stop() +{ + CHThorActivityBase::stop(); +} + +void CHThorGenericDiskWriteBaseActivity::execute() +{ + CHThorActivityBase::execute(); +} + +IDiskRowWriter * CHThorGenericDiskWriteBaseActivity::ensureRowWriter(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, const IPropertyTree * options) +{ + Owned mapping = createDiskReadMapping(RecordTranslationMode::None, format, actualCrc, actual, expectedCrc, expected, projectedCrc, projected, options); + + ForEachItemIn(i, writers) + { + IDiskRowWriter & cur = writers.item(i); + if (cur.matches(format, streamRemote, mapping)) + return &cur; + } + IDiskRowWriter * writer = createDiskWriter(format, streamRemote, mapping); + writers.append(*writer); + return writer; +} + +CHThorGenericDiskWriteActivity::CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node) +: CHThorGenericDiskWriteBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg) +{ + +} + +// Implement all pure virtual functions from CHThorGenericDiskWriteBaseActivity +void CHThorGenericDiskWriteActivity::ready() +{ + PARENT::ready(); + if (!activeWriter) + activeWriter = ensureRowWriter("flat", false, helper.getFormatCrc(), *helper.queryDiskRecordSize(), helper.getFormatCrc(), *helper.queryDiskRecordSize(), 0, *helper.queryDiskRecordSize(), inputOptions); + outSeq.setown(activeWriter->queryRowSink()); +} + +void CHThorGenericDiskWriteActivity::stop() +{ + PARENT::stop(); +} + +void CHThorGenericDiskWriteActivity::execute() +{ + // Loop thru the results + numRecords = 0; + while (next()) + numRecords++; + finishOutput(); +} + +const void * CHThorGenericDiskWriteActivity::getNext() +{ // through operation (writes and returns row) + // needs a one row lookahead to preserve group + if (!nextrow.get()) + { + nextrow.setown(input->nextRow()); + if (!nextrow.get()) + { + nextrow.setown(input->nextRow()); + if (nextrow.get()&&grouped) // only write eog if not at eof + outSeq->putRow(NULL); + return NULL; + } + } + outSeq->putRow(nextrow.getLink()); + checkSizeLimit(); + return nextrow.getClear(); +} + +bool CHThorGenericDiskWriteActivity::finishOutput() +{ + return true; +} + +bool CHThorGenericDiskWriteActivity::next() +{ + if (!nextrow.get()) + { + OwnedConstRoxieRow row(input->nextRow()); + if (!row.get()) + { + row.setown(input->nextRow()); + if (!row.get()) + return false; // we are done + if (grouped) + outSeq->putRow(NULL); + } + outSeq->putRow(row.getClear()); + } + else + outSeq->putRow(nextrow.getClear()); + checkSizeLimit(); + return true; +} + +void CHThorGenericDiskWriteActivity::checkSizeLimit() +{ + if (sizeLimit && (numRecords >= sizeLimit)) + throw MakeStringException(0, "Size limit exceeded"); +} + //===================================================================================================== MAKEFACTORY(DiskWrite); @@ -12070,6 +12186,11 @@ extern HTHOR_API IHThorActivity *createGenericDiskReadActivity(IAgentContext &_a return new CHThorGenericDiskReadActivity(_agent, _activityId, _subgraphId, arg, kind, _graph, node); } +extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node) +{ + return new CHThorGenericDiskWriteActivity(_agent, _activityId, _subgraphId, arg, kind, _graph, node); +} + MAKEFACTORY(Null); MAKEFACTORY(SideEffect); MAKEFACTORY(Action); diff --git a/ecl/hthor/hthor.hpp b/ecl/hthor/hthor.hpp index 78644a7221a..9f2ede59cac 100644 --- a/ecl/hthor/hthor.hpp +++ b/ecl/hthor/hthor.hpp @@ -174,6 +174,7 @@ extern HTHOR_API IHThorActivity *createDiskGroupAggregateActivity(IAgentContext extern HTHOR_API IHThorActivity *createNewDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *node); extern HTHOR_API IHThorActivity *createGenericDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node); +extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node); extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node); extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node); diff --git a/ecl/hthor/hthor.ipp b/ecl/hthor/hthor.ipp index 3dc5695dd20..ae770538216 100644 --- a/ecl/hthor/hthor.ipp +++ b/ecl/hthor/hthor.ipp @@ -3241,6 +3241,58 @@ protected: void onLimitExceeded(); }; +class CHThorGenericDiskWriteBaseActivity : public CHThorActivityBase/*, implements IThorDiskCallback, implements IIndexWriteContext, public IFileCollectionContext*/ +{ +protected: + IHThorDiskWriteArg &helper; + IDiskRowWriter * activeWriter = nullptr; + CLogicalFileCollection files; + IArrayOf writers; + Owned inputOptions; + bool outputGrouped = false; + OwnedConstRoxieRow nextrow; // needed for grouped spill + +public: + CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node); + IMPLEMENT_IINTERFACE_USING(CHThorActivityBase) + + virtual void ready(); + virtual void stop(); + virtual void execute(); + + //interface IHThorInput + virtual bool isGrouped() { return outputGrouped; } + virtual IOutputMetaData * queryOutputMeta() const { return outputMeta; } + + protected: + IDiskRowWriter * ensureRowWriter(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, const IPropertyTree * options); +}; + +class CHThorGenericDiskWriteActivity : public CHThorGenericDiskWriteBaseActivity +{ + typedef CHThorGenericDiskWriteBaseActivity PARENT; +protected: + IHThorDiskWriteArg &helper; + bool grouped; + Owned outSeq; + unsigned __int64 numRecords; + offset_t sizeLimit; + + bool finishOutput(); + bool next(); + const void * getNext(); + void checkSizeLimit(); +public: + IMPLEMENT_SINKACTIVITY + + CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node); + + virtual void ready(); + virtual void stop(); + virtual void execute(); + virtual bool needsAllocator() const { return true; } +}; + #define MAKEFACTORY(NAME) \ extern HTHOR_API IHThorActivity * create ## NAME ## Activity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThor ## NAME ## Arg &arg, ThorActivityKind kind, EclGraph & _graph) \ diff --git a/system/jlib/jrowstream.hpp b/system/jlib/jrowstream.hpp index 8c60f44cb45..5a99a45a6fc 100644 --- a/system/jlib/jrowstream.hpp +++ b/system/jlib/jrowstream.hpp @@ -48,7 +48,7 @@ class MemoryBuffer; class MemoryBufferBuilder; //An interface for reading rows - which can request the row in the most efficient way for the caller. -interface IDiskRowStream : extends IRowStream +interface ILogicalRowStream : extends IRowStream { // Defined in IRowStream, here for documentation: // Request a row which is owned by the caller, and must be freed once it is finished with. @@ -74,6 +74,19 @@ interface IDiskRowStream : extends IRowStream virtual const void *nextRow(MemoryBufferBuilder & builder)=0; // rows returned are created in the target buffer. This should be generalized to an ARowBuilder }; +using IDiskRowStream = ILogicalRowStream; // MORE: Replace these in the code, but alias for now to avoid compile problems + + +//An interface for writing rows - with separate functions whether or not ownership of the row is being passed +interface ILogicalRowSink : extends IRowWriterEx +{ +// Defined in IRowWriterEx, here for documentation: + virtual void putRow(const void *row) override = 0; // takes ownership of row. rename to putOwnedRow? + virtual void flush() override = 0; + virtual void noteStopped() override = 0; + + virtual void writeRow(const void *row) = 0; // does not take ownership of row +}; extern jlib_decl IDiskRowStream * queryNullDiskRowStream(); diff --git a/testing/regress/ecl/genericActivities.ecl b/testing/regress/ecl/genericActivities.ecl new file mode 100644 index 00000000000..df7b0f18ca6 --- /dev/null +++ b/testing/regress/ecl/genericActivities.ecl @@ -0,0 +1,4 @@ +OUTPUT(DATASET([{1}], {UNSIGNED1 a}), ,'data'); + +d := DATASET('data', {UNSIGNED1 a}, TYPE(FLAT)); +OUTPUT(d); \ No newline at end of file