Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.2.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>

# Conflicts:
#	helm/hpcc/Chart.yaml
#	helm/hpcc/templates/_helpers.tpl
#	helm/hpcc/templates/dafilesrv.yaml
#	helm/hpcc/templates/dali.yaml
#	helm/hpcc/templates/dfuserver.yaml
#	helm/hpcc/templates/eclagent.yaml
#	helm/hpcc/templates/eclccserver.yaml
#	helm/hpcc/templates/eclscheduler.yaml
#	helm/hpcc/templates/esp.yaml
#	helm/hpcc/templates/localroxie.yaml
#	helm/hpcc/templates/roxie.yaml
#	helm/hpcc/templates/sasha.yaml
#	helm/hpcc/templates/thor.yaml
#	version.cmake
  • Loading branch information
ghalliday committed Sep 6, 2023
2 parents 0ef5fbe + b876494 commit 4a4e8dc
Show file tree
Hide file tree
Showing 31 changed files with 660 additions and 450 deletions.
319 changes: 3 additions & 316 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <unordered_set>

#include "jlib.hpp"
#include "jcontainerized.hpp"
#include "workunit.hpp"
#include "jprop.hpp"
#include "jmisc.hpp"
Expand Down Expand Up @@ -5946,26 +5947,9 @@ void CWorkUnitFactory::reportAbnormalTermination(const char *wuid, WUState &stat

static CriticalSection deleteDllLock;
static IWorkQueueThread *deleteDllWorkQ = nullptr;
static unsigned podInfoInitCBId = 0;
static StringBuffer myPodName;

const char *queryMyPodName()
{
return myPodName;
}

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
auto updateFunc = [&](const IPropertyTree *oldComponentConfiguration, const IPropertyTree *oldGlobalConfiguration)
{
if (myPodName.length()) // called at config load time, and never needs to be refreshed
return;
// process pod information from environment
getEnvVar("MY_POD_NAME", myPodName.clear());
PROGLOG("The podName = %s", myPodName.str());
};
if (isContainerized())
podInfoInitCBId = installConfigUpdateHook(updateFunc, true);
return true;
}
MODULE_EXIT()
Expand All @@ -5974,8 +5958,6 @@ MODULE_EXIT()
if (deleteDllWorkQ)
::Release(deleteDllWorkQ);
deleteDllWorkQ = nullptr;

removeConfigUpdateHook(podInfoInitCBId);
}
static void asyncRemoveDll(const char * name)
{
Expand Down Expand Up @@ -14248,7 +14230,7 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
{
CCycleTimer elapsedTimer;

bool multiJobLinger = config.getPropBool("@multiJobLinger", true);
bool multiJobLinger = config.getPropBool("@multiJobLinger", defaultThorMultiJobLinger);

// NB: executeGraphOnLingeringThor looks for existing Thor instance that has been used for the same job,
// and communicates with it directly
Expand Down Expand Up @@ -14310,7 +14292,7 @@ void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IP
else
{
VStringBuffer job("%s-%s", wuid.str(), graphName);
runK8sJob("thormanager", wuid, job, { { "graphName", graphName} });
k8s::runJob("thormanager", wuid, job, { { "graphName", graphName} });
}

/* In k8s, Thor feeds back the terminating exception via the workunit.
Expand Down Expand Up @@ -14646,304 +14628,9 @@ bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const
}
return false;
}

static void setResources(IPropertyTree *workerConfig, const IConstWorkUnit *workunit, const char *process)
{
auto setResourcesItem = [&workerConfig](const char *category, const char *resourceName, unsigned value, const char *units)
{
if (!value) return;
VStringBuffer xpath("spec/template/spec/containers/resources/%s@%s", category, resourceName);
ensurePTree(workerConfig, xpath.str());

VStringBuffer v("%u%s", value, units);
workerConfig->setProp(xpath.str(), v.str());
};

StringBuffer s;
unsigned memRequest = workunit->getDebugValueInt(s.clear().appendf("resource-%s-memory", process), 0);
setResourcesItem("requests", "memory", memRequest, "Mi");
setResourcesItem("limits", "memory", memRequest, "Mi");

unsigned cpuRequest = workunit->getDebugValueInt(s.clear().appendf("resource-%s-cpu", process), 0);
setResourcesItem("requests", "cpu", cpuRequest, "m");
setResourcesItem("limits", "cpu", cpuRequest, "m");
}

KeepK8sJobs translateKeepJobs(const char *keepJob)
{
if (!isEmptyString(keepJob)) // common case
{
if (streq("podfailures", keepJob))
return KeepK8sJobs::podfailures;
else if (streq("all", keepJob))
return KeepK8sJobs::all;
}
return KeepK8sJobs::none;
}

bool isActiveK8sService(const char *serviceName)
{
VStringBuffer getEndpoints("kubectl get endpoints %s \"--output=jsonpath={range .subsets[*].addresses[*]}{.ip}{'\\n'}{end}\"", serviceName);
StringBuffer output;
runKubectlCommand("checkEndpoints", getEndpoints.str(), nullptr, &output);
// Output should be zero or more lines each with an IP
return (output.length() && output.charAt(0) != '\n');
}

void deleteK8sResource(const char *componentName, const char *resourceType, const char *job)
{
VStringBuffer resourceName("%s-%s-%s", componentName, resourceType, job);
resourceName.toLowerCase();
VStringBuffer deleteResource("kubectl delete %s/%s", resourceType, resourceName.str());
runKubectlCommand(componentName, deleteResource, nullptr, nullptr);

// have to assume command succeeded (if didn't throw exception)
// NB: file will only exist if autoCleanup used (it's okay not to exist)
StringBuffer jobName(job);
jobName.toLowerCase();
VStringBuffer k8sResourcesFilename("%s,%s,%s.k8s", componentName, resourceType, jobName.str());
remove(k8sResourcesFilename);
}

void waitK8sJob(const char *componentName, const char *resourceType, const char *job, unsigned pendingTimeoutSecs, KeepK8sJobs keepJob)
{
VStringBuffer jobName("%s-%s-%s", componentName, resourceType, job);
jobName.toLowerCase();
VStringBuffer waitJob("kubectl get jobs %s -o jsonpath={.status.active}", jobName.str());
VStringBuffer getScheduleStatus("kubectl get pods --selector=job-name=%s --output=jsonpath={.items[*].status.conditions[?(@.type=='PodScheduled')].status}", jobName.str());
VStringBuffer checkJobExitCode("kubectl get pods --selector=job-name=%s --output=jsonpath={.items[*].status.containerStatuses[?(@.name==\"%s\")].state.terminated.exitCode}", jobName.str(), jobName.str());

unsigned delay = 100;
unsigned start = msTick();

bool schedulingTimeout = false;
Owned<IException> exception;
try
{
for (;;)
{
StringBuffer output;
runKubectlCommand(componentName, waitJob, nullptr, &output);
if (!streq(output, "1")) // status.active value
{
// Job is no longer active - we can terminate
DBGLOG("kubectl jobs output: %s", output.str());
runKubectlCommand(componentName, checkJobExitCode, nullptr, &output.clear());
if (output.length() && !streq(output, "0")) // state.terminated.exitCode
throw makeStringExceptionV(0, "Failed to run %s: pod exited with error: %s", jobName.str(), output.str());
break;
}
runKubectlCommand(nullptr, getScheduleStatus, nullptr, &output.clear());

// Check whether pod has been scheduled yet - if resources are not available pods may block indefinitely waiting to be scheduled, and
// we would prefer them to fail instead.
bool pending = streq(output, "False");
if (pendingTimeoutSecs && pending && msTick()-start > pendingTimeoutSecs*1000)
{
schedulingTimeout = true;
VStringBuffer getReason("kubectl get pods --selector=job-name=%s \"--output=jsonpath={range .items[*].status.conditions[?(@.type=='PodScheduled')]}{.reason}{': '}{.message}{end}\"", jobName.str());
runKubectlCommand(componentName, getReason, nullptr, &output.clear());
throw makeStringExceptionV(0, "Failed to run %s - pod not scheduled after %u seconds: %s ", jobName.str(), pendingTimeoutSecs, output.str());
}
MilliSleep(delay);
if (delay < 10000)
delay = delay * 2;
}
}
catch (IException *e)
{
EXCLOG(e, nullptr);
exception.setown(e);
}
if (keepJob != KeepK8sJobs::all)
{
// Delete jobs unless the pod failed and keepJob==podfailures
if ((nullptr == exception) || (KeepK8sJobs::podfailures != keepJob) || schedulingTimeout)
deleteK8sResource(componentName, "job", job);
}
if (exception)
throw exception.getClear();
}

bool applyK8sYaml(const char *componentName, const char *wuid, const char *job, const char *resourceType, const std::list<std::pair<std::string, std::string>> &extraParams, bool optional, bool autoCleanup)
{
StringBuffer jobName(job);
jobName.toLowerCase();
VStringBuffer jobSpecFilename("/etc/config/%s-%s.yaml", componentName, resourceType);
StringBuffer jobYaml;
try
{
jobYaml.loadFile(jobSpecFilename, false);
}
catch (IException *E)
{
if (!optional)
throw;
E->Release();
return false;
}
jobYaml.replaceString("_HPCC_JOBNAME_", jobName.str());

VStringBuffer args("\"--workunit=%s\"", wuid);
args.append(" \"--k8sJob=true\"");
for (const auto &p: extraParams)
{
if (hasPrefix(p.first.c_str(), "_HPCC_", false)) // job yaml substitution
jobYaml.replaceString(p.first.c_str(), p.second.c_str());
else
args.append(" \"--").append(p.first.c_str()).append('=').append(p.second.c_str()).append("\"");
}
jobYaml.replaceString("_HPCC_ARGS_", args.str());

// Disable ability change resources from within workunit
// - all values are unquoted by toYAML. This caused problems when previous string values are
// outputted unquoted and then treated as a non string type -e.g. labels in metadata.
// - Also, ability to control if and how much users may change resources should be provided.
#if 0
Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
if (factory)
{
Owned<IConstWorkUnit> workunit = factory->openWorkUnit(wuid);
if (workunit)
{
Owned<IPropertyTree> workerConfig = createPTreeFromYAMLString(jobYaml.length(), jobYaml.str(), 0, ptr_none, nullptr);
setResources(workerConfig, workunit, componentName);
toYAML(workerConfig, jobYaml.clear(), 2, 0);
}
}
#endif

runKubectlCommand(componentName, "kubectl replace --force -f -", jobYaml, nullptr);

if (autoCleanup)
{
// touch a file, with naming convention { componentName },{ resourceType },{ jobName }.k8s
// it will be used if the job fails ungracefully, to tidy up leaked resources
// normally (during graceful cleanup) these resources and files will be deleted by deleteK8sResource
VStringBuffer k8sResourcesFilename("%s,%s,%s.k8s", componentName, resourceType, jobName.str());
touchFile(k8sResourcesFilename);
}

return true;
}

static constexpr unsigned defaultPendingTimeSecs = 600;
void runK8sJob(const char *componentName, const char *wuid, const char *jobName, const std::list<std::pair<std::string, std::string>> &extraParams)
{
Owned<IPropertyTree> compConfig = getComponentConfig();
KeepK8sJobs keepJob = translateKeepJobs(compConfig->queryProp("@keepJobs"));
unsigned pendingTimeoutSecs = compConfig->getPropInt("@pendingTimeoutSecs", defaultPendingTimeSecs);

bool removeNetwork = applyK8sYaml(componentName, wuid, jobName, "networkpolicy", extraParams, true, true);
applyK8sYaml(componentName, wuid, jobName, "job", extraParams, false, KeepK8sJobs::none == keepJob);
Owned<IException> exception;
try
{
waitK8sJob(componentName, "job", jobName, pendingTimeoutSecs, keepJob);
}
catch (IException *e)
{
EXCLOG(e, nullptr);
exception.setown(e);
}
if (removeNetwork)
deleteK8sResource(componentName, "networkpolicy", jobName);
if (exception)
throw exception.getClear();
}

// returns a vector of {pod-name, node-name} vectors,
// represented as a nested vector for extensibility, e.g. to add other meta fields
std::vector<std::vector<std::string>> getPodNodes(const char *selector)
{
VStringBuffer getWorkerNodes("kubectl get pods --selector=job-name=%s \"--output=jsonpath={range .items[*]}{.metadata.name},{.spec.nodeName}{'\\n'}{end}\"", selector);
StringBuffer result;
runKubectlCommand("get-worker-nodes", getWorkerNodes, nullptr, &result);

if (result.isEmpty())
throw makeStringExceptionV(-1, "No worker nodes found for selector '%s'", selector);

const char *start = result.str();
const char *finger = start;
std::string fieldName;
std::vector<std::vector<std::string>> results;
std::vector<std::string> current;
while (true)
{
switch (*finger)
{
case ',':
{
if (start == finger)
throw makeStringException(-1, "getPodNodes: Missing node name(s) in output");
fieldName.assign(start, finger-start);
current.emplace_back(std::move(fieldName));
finger++;
start = finger;
break;
}
case '\n':
case '\0':
{
if (start == finger)
throw makeStringException(-1, "getPodNodes: Missing pod name(s) in output");
fieldName.assign(start, finger-start);
current.emplace_back(std::move(fieldName));
results.emplace_back(std::move(current));
if ('\0' == *finger)
return results;
finger++;
start = finger;
break;
}
default:
{
++finger;
break;
}
}
}
}

#else
KeepK8sJobs translateKeepJobs(const char *keepJobs)
{
throwUnexpected();
}

bool isActiveK8sService(const char *serviceName)
{
throwUnexpected();
}

bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName)
{
throwUnexpected();
}

void deleteK8sResource(const char *componentName, const char *job, const char *resource)
{
throwUnexpected();
}

void waitK8sJob(const char *componentName, const char *resourceType, const char *job, unsigned pendingTimeoutSecs, KeepK8sJobs keepJob)
{
throwUnexpected();
}

bool applyK8sYaml(const char *componentName, const char *wuid, const char *job, const char *resourceType, const std::list<std::pair<std::string, std::string>> &extraParams, bool optional, bool autoCleanup)
{
throwUnexpected();
}

void runK8sJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams)
{
throwUnexpected();
}

std::vector<std::vector<std::string>> getPodNodes(const char *selector)
{
throwUnexpected();
}

#endif
18 changes: 3 additions & 15 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1773,24 +1773,12 @@ extern WORKUNIT_API bool isValidMemoryValue(const char * memoryUnit);

inline double calcCost(double ratePerHour, unsigned __int64 ms) { return ratePerHour * ms / 1000 / 3600; }

constexpr bool defaultThorMultiJobLinger = true;
constexpr unsigned defaultThorLingerPeriod = 60;
extern WORKUNIT_API void executeThorGraph(const char * graphName, IConstWorkUnit &workunit, const IPropertyTree &config);

enum class KeepK8sJobs { none, podfailures, all };
extern WORKUNIT_API KeepK8sJobs translateKeepJobs(const char *keepJobs);

extern WORKUNIT_API bool isActiveK8sService(const char *serviceName);
extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName);
extern WORKUNIT_API void deleteK8sResource(const char *componentName, const char *job, const char *resource);
extern WORKUNIT_API void waitK8sJob(const char *componentName, const char *job, unsigned pendingTimeoutSecs, KeepK8sJobs keepJob);
extern WORKUNIT_API bool applyK8sYaml(const char *componentName, const char *wuid, const char *job, const char *resourceType, const std::list<std::pair<std::string, std::string>> &extraParams, bool optional, bool autoCleanup);
extern WORKUNIT_API void runK8sJob(const char *componentName, const char *wuid, const char *job, const std::list<std::pair<std::string, std::string>> &extraParams={});

// returns a vector of {pod-name, node-name} vectors,
extern WORKUNIT_API std::vector<std::vector<std::string>> getPodNodes(const char *selector);

extern WORKUNIT_API const char *queryMyPodName();

extern WORKUNIT_API TraceFlags loadTraceFlags(IConstWorkUnit * wu, const std::initializer_list<TraceOption> & y, TraceFlags dft);

extern WORKUNIT_API bool executeGraphOnLingeringThor(IConstWorkUnit &workunit, unsigned wfid, const char *graphName);

#endif
Loading

0 comments on commit 4a4e8dc

Please sign in to comment.