Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into sql/feature/suppor…
Browse files Browse the repository at this point in the history
…t_multi_column_validity_dates
  • Loading branch information
awildturtok committed Aug 15, 2023
2 parents 36d30bb + c822739 commit 1a52450
Show file tree
Hide file tree
Showing 12 changed files with 250 additions and 127 deletions.
1 change: 0 additions & 1 deletion .github/workflows/test_backend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,4 @@ jobs:
- name: JSON based Integration Tests
run: mvn test -T 1C -pl backend -Dgroups="INTEGRATION_JSON"
- name: SQL based Integration Tests
if: ${{ startsWith(github.head_ref, 'sql/') }}
run: mvn test -T 1C -pl backend -Dgroups="INTEGRATION_SQL_BACKEND"
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public abstract class ManagedExecution extends IdentifiableImpl<ManagedExecution

@JsonIgnore
@EqualsAndHashCode.Exclude
private transient DistributedNamespace namespace;
private transient Namespace namespace;
@JsonIgnore
@EqualsAndHashCode.Exclude
private transient ConqueryConfig config;
Expand Down Expand Up @@ -161,7 +161,7 @@ public final void initExecutable(Namespace namespace, ConqueryConfig config) {
label = makeAutoLabel(new PrintSettings(true, I18n.LOCALE.get(), namespace, config, null));
}

this.namespace = ((DistributedNamespace) namespace);
this.namespace = namespace;
this.config = config;

doInitExecutable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.bakdata.conquery.models.query.resultinfo.ResultInfo;
import com.bakdata.conquery.models.query.results.EntityResult;
import com.bakdata.conquery.models.query.results.FormShardResult;
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.OptBoolean;
Expand Down Expand Up @@ -205,4 +206,8 @@ private boolean allSubQueriesDone() {
return flatSubQueries.values().stream().allMatch(q -> q.getState().equals(ExecutionState.DONE));
}
}

public DistributedNamespace getNamespace() {
return (DistributedNamespace) super.getNamespace();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package com.bakdata.conquery.models.messages.network.specific;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId;
Expand All @@ -14,7 +20,6 @@
import com.bakdata.conquery.util.io.ConqueryMDC;
import com.bakdata.conquery.util.progressreporter.ProgressReporter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import lombok.AccessLevel;
Expand All @@ -25,53 +30,62 @@
import lombok.ToString;

/**
* Messages are sent serialized and only deserialized when they are being processed. This ensures that messages that were sent just shortly before to setup state later messages depend upon is correct.
* @implNote Messages are sent serialized and only deserialized when they are being processed. This ensures that messages that were sent just shortly before to setup state later messages depend upon is correct.
* @implNote Messages are additionally sent gzipped, to avoid hogging memory with long queues.
*/
@CPSType(id = "FORWARD_TO_WORKER", base = NetworkMessage.class)
@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
@ToString(of = {"workerId", "text"})
public class ForwardToWorker extends MessageToShardNode implements SlowMessage {

@SneakyThrows(JsonProcessingException.class)
private final WorkerId workerId;
private final byte[] messageRaw;
// We cache these on the sender side.
@Getter(onMethod_ = @JsonIgnore(false))
private final boolean slowMessage;
private final String text;
@JsonIgnore
@Setter
private ProgressReporter progressReporter;

public static ForwardToWorker create(WorkerId worker, WorkerMessage message, ObjectWriter writer) {
return new ForwardToWorker(
worker,
writer.writeValueAsBytes(message),
serializeMessage(message, writer),
true,
message.toString()
);
}

private final WorkerId workerId;
private final byte[] messageRaw;
@SneakyThrows(IOException.class)
private static byte[] serializeMessage(WorkerMessage message, ObjectWriter writer) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (OutputStream outputStream = new GZIPOutputStream(baos)) {
writer.writeValue(outputStream, message);
}

// We cache these on the sender side.
@Getter(onMethod_ = @JsonIgnore(false))
private final boolean slowMessage;
private final String text;
return baos.toByteArray();
}

@JsonIgnore
@Setter
private ProgressReporter progressReporter;
private static WorkerMessage deserializeMessage(byte[] messageRaw, ObjectMapper mapper) throws java.io.IOException {
return mapper.readerFor(WorkerMessage.class).readValue(new GZIPInputStream(new ByteArrayInputStream(messageRaw)));
}

@Override
public void react(ShardNodeNetworkContext context) throws Exception {
Worker worker = Objects.requireNonNull(context.getWorkers().getWorker(workerId));
final Worker worker = Objects.requireNonNull(context.getWorkers().getWorker(workerId));
ConqueryMDC.setLocation(worker.toString());


// Jobception: this is to ensure that no subsequent message is deserialized before one message is processed
worker.getJobManager().addSlowJob(new SimpleJob("Deserialize and process WorkerMessage", () -> {
worker.getJobManager().addSlowJob(new SimpleJob("Process %s".formatted(getText()), () -> {

WorkerMessage message = deserializeMessage(messageRaw, worker.getCommunicationMapper());
final WorkerMessage message = deserializeMessage(messageRaw, worker.getCommunicationMapper());

message.setProgressReporter(progressReporter);
message.react(worker);
}));
}

private static WorkerMessage deserializeMessage(byte[] messageRaw, ObjectMapper binaryMapper) throws java.io.IOException {
return binaryMapper.readerFor(WorkerMessage.class).readValue(messageRaw);
}
}
Original file line number Diff line number Diff line change
@@ -1,72 +1,163 @@
<#import "templates/template.html.ftl" as layout>
<#import "templates/breadcrumbs.html.ftl" as breadcrumbs>
<#import "templates/accordion.html.ftl" as accordion>

<@layout.layout>
<div class="row">
<div class="col">
<input type="checkbox" id="update" name="update" checked>
<label for="update">Reload automatically.</label><br>
<script type="text/javascript">
setTimeout(function () {
if(!document.getElementById("update").checked){
return
}
location.reload(false);
}, 5000);
function cancelJob(jobId) {
event.preventDefault();
fetch(
${r"`/admin/jobs/${jobId}/cancel`"},
{
method: "post",
credentials: "same-origin"
}
)
}
</script>
</div>
</div>


<#list c as status>
<div class="row">
<div class="col">
<div class="card">
<div class="card-body">
<h5 class="card-title">
${status.origin} ${(status.dataset)!}
<span class="float-right">
<small>updated ${status.ageString} ago</small>
<span class="badge badge-secondary">${status.jobs?size}</span>
</span>
</h5>
<div class="card-text" style="max-height:50vh; overflow: auto">
<table class="table">
<#list status.jobs as job>
<tr class="${job.cancelled?then('active','')}">
<td>
${job.label}
</td>
<td class="w-100">
<div class="progress position-relative">
<div class="progress-bar" role="progressbar" style="width: ${job.progress?string.percent}" aria-valuenow="${job.progress}" aria-valuemin="0" aria-valuemax="1"></div>
</div>
</td>
<td>
<#if !job.cancelled>
<a href="" onclick="cancelJob('${job.jobId}')" class="btn btn-warning btn-sm"/>
<#else>
<div>Cancelled</div>
</#if>
</td>
</tr>
</#list>
</table>
</div>
</div>
</div>
</div>
</div>
</#list>
</@layout.layout>
<script type="text/javascript">
function cancelJob(jobId) {
event.preventDefault();
fetch(
"/admin/jobs/" + jobId + "/cancel",
{
method: "post",
credentials: "same-origin"
}
);
}
function getJobs() {
return fetch("/admin/jobs")
.then((res) => res.json())
.then((entries) => {
const origins = {};
entries.forEach((entry) => {
const origin = entry.origin
origins[entry.origin] = [
...(origins[entry.origin] ?? []),
entry
];
});
return origins;
});
}
function findNodeOrCloneTemplate(templateId, newId = "", parentNode) {
if (newId !== "" && document.getElementById(newId)) {
return document.getElementById(newId);
} else {
const clonedTemplate = document.getElementById(templateId).cloneNode(true);
clonedTemplate.id = newId;
parentNode?.appendChild(clonedTemplate);
return clonedTemplate;
}
}
async function refreshJobs() {
const origins = await getJobs();
Object.keys(origins).forEach((origin) => {
const nodes = origins[origin];
const accordion = findNodeOrCloneTemplate("originTemplate", "origin_" + origin, document.getElementById("nodesAccordionGroup"));
accordion.querySelector("h5").innerText = origin;
const accordionDetails = findNodeOrCloneTemplate("originDetailsTemplate");
const timeDifference = (new Date() - new Date(nodes[0].timestamp)) / 1000;
accordionDetails.querySelector(".ageString").innerText = timeDifference + "s";
accordionDetails.querySelector(".jobsAmount").innerText = nodes
.map((node) => node?.jobs.length ?? 0)
.reduce((partialSum, x) => partialSum + x, 0);
accordion.querySelector(".accordion-infotext").innerHTML = "";
accordion.querySelector(".accordion-infotext").appendChild(accordionDetails);
nodes.forEach((node) => {
const fullName = node.origin + (node.dataset ? "::" + node.dataset : "")
const nodeElement = findNodeOrCloneTemplate("nodeTemplate", "node_" + fullName, accordion.querySelector(".accordionContent"));
nodeElement.querySelector(".nodeName").innerText = fullName;
nodeElement.querySelector(".jobsAmount").innerText = node?.jobs?.length ?? "0";
const jobsList = nodeElement.querySelector(".jobsList");
if (node?.jobs.length > 0) {
jobsList.innerHTML = "";
node.jobs.forEach((job) => {
const jobElement = findNodeOrCloneTemplate("jobTemplate", "job_" + job.jobId, jobsList);
jobElement.querySelector(".jobLabel").innerText = job?.label;
jobElement.querySelector(".jobLabel").title = job?.label;
jobElement.querySelector(".jobProgress").style.width = Math.round((job?.progress ?? 0) * 100) + "%";
jobElement.querySelector(".jobProgress").attributes["aria-valuenow"] = job?.progress;
const jobActionElement = jobElement.querySelector(".jobAction");
if (job?.cancelled) {
jobActionElement.innerText = 'Cancelled';
} else {
jobActionElement.querySelector("button").onclick = () => cancelJob(job?.jobId);
}
});
} else {
jobsList.innerHTML = "";
findNodeOrCloneTemplate("nojobsTemplate", "", jobsList);
}
});
});
// collapse accordions on page laod
if (!this.alreadyExecuted) {
this.alreadyExecuted = true;
document.querySelectorAll(".collapse").forEach((elem) => elem.classList.remove("show"));
}
}
refreshJobs();
setInterval(function () {
if(!document.getElementById("update")?.checked) return;
refreshJobs();
}, 5000);
</script>
<@breadcrumbs.breadcrumbs
labels=["Jobs"]
/>
<div class="d-flex justify-content-end">
<div class="custom-control custom-switch">
<input type="checkbox" class="custom-control-input" id="update" checked>
<label class="custom-control-label" for="update">Reload automatically</label>
</div>
</div>
<@accordion.accordionGroup id="nodesAccordionGroup" class="mt-3"></@accordion.accordionGroup>
</@layout.layout>
<!-- HTML Templates -->
<div class="d-none">
<@accordion.accordion summary="" id="originTemplate"></@accordion.accordion>
<div id="originDetailsTemplate">
updated <span class="ageString"></span> ago
<span class="jobsAmount badge badge-secondary"></span>
</div>
<div id="nodeTemplate">
<div class="d-flex justify-content-between align-items-center">
<span class="nodeName py-2" style="font-size: 1.2rem;"></span>
<div>
<span class="jobsAmount badge badge-secondary"></span>
</div>
</div>
<div class="jobsList p-2 mb-3 border" style="max-height: 200px; overflow: auto;"></div>
</div>
<div id="nojobsTemplate" class="w-100 text-black-50 text-center">No jobs in this node</div>
<div
id="jobTemplate"
class="d-flex justify-content-between align-items-center py-2"
style="gap: 25px; border-bottom: 1px solid #ccc"
>
<div
class="jobLabel text-nowrap"
style="overflow: hidden; text-overflow: ellipsis; flex-basis: 600px;"
></div>
<div class="progress position-relative" style="flex-grow: 1;">
<div class="jobProgress progress-bar" role="progressbar" aria-valuemin="0" aria-valuemax="1"></div>
</div>
<div class="jobAction d-flex justify-content-center" style="flex-basis: 80px;">
<button
type="button"
class="btn btn-danger btn-sm text-white fas fa-ban"
data-toggle="tooltip"
data-placement="bottom"
title="Cancel Job"
/>
</div>
</div>
</div>
Loading

0 comments on commit 1a52450

Please sign in to comment.