Skip to content

Commit

Permalink
Batch tracking (#808)
Browse files Browse the repository at this point in the history
* WIP: Batch tracking

* Fix syntax feil i batchprograss dao

* Add test for BatchTrackingDao

* Remove unused dep

* Bruk _ for ubrukte parameter

---------

Co-authored-by: sigurdgroneng <[email protected]>
  • Loading branch information
holymaloney and tu55eladd authored Aug 2, 2024
1 parent 264ce19 commit 435a6d2
Show file tree
Hide file tree
Showing 9 changed files with 162 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public List<AktivitetData> hentStillingFraNavUtenSvarDerFristErUtlopt(long maxAn
new AktivitetDataRowMapper());
}

public List<AktivitetData> hentStillingFraNavSomErFullfortEllerAvbruttUtenSvar(long maxAntall) {
SqlParameterSource parameter = new MapSqlParameterSource("maxAntall", maxAntall);
public List<AktivitetData> hentStillingFraNavSomErFullfortEllerAvbruttUtenSvar(long maxAntall, long sisteProsesserteVersjon) {
SqlParameterSource parameter = new MapSqlParameterSource("maxAntall", maxAntall)
.addValue("sisteProsesserteVersjon", sisteProsesserteVersjon);
return jdbcTemplate.query("""
SELECT SFN.ARBEIDSGIVER as "STILLING_FRA_NAV.ARBEIDSGIVER", SFN.ARBEIDSSTED as "STILLING_FRA_NAV.ARBEIDSSTED",
SFN.DETALJER AS "STILLING_FRA_NAV.DETALJER",
Expand All @@ -56,8 +57,9 @@ AND LIVSLOPSTATUS_KODE IN('AVBRUTT','FULLFORT')
AND HISTORISK_DATO is null
AND SFN.LIVSLOPSSTATUS NOT IN('AVBRUTT_AV_BRUKER', 'AVBRUTT_AV_SYSTEM', 'HAR_SVART')
AND SFN.CV_KAN_DELES IS NULL
order by A.AKTIVITET_ID
fetch first :maxAntall rows only
AND A.versjon > :sisteProsesserteVersjon
order by A.versjon
limit :maxAntall
""",
parameter,
new AktivitetDataRowMapper());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void notifiserAvbruttEllerFullfortUtenSvar(AktivitetData aktivitet) {
var endretAv = Person.systemUser();
AktivitetData nyAktivitet = aktivitet.toBuilder()
.endretAv(endretAv.get())
.endretDato(new Date())
.endretAvType(endretAv.tilInnsenderType())
.stillingFraNavData(aktivitet.getStillingFraNavData().withLivslopsStatus(LivslopsStatus.AVBRUTT_AV_BRUKER))
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package no.nav.veilarbaktivitet.stilling_fra_nav

import org.slf4j.LoggerFactory
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
import org.springframework.stereotype.Repository

enum class BatchJob {
Deling_av_cv_avbrutt_eller_fuulfort_uten_svar
}

@Repository
open class BatchTrackingDAO(
val template: NamedParameterJdbcTemplate
) {
private val log = LoggerFactory.getLogger(javaClass)

open fun setSisteProsesserteVersjon(batch: BatchJob, sisteProsesserteVersjon: Long) {
template.update("""
INSERT INTO batch_tracking (batch_name, last_offset) VALUES (:batchName, :sisteProsesserteVersjon)
ON CONFLICT(batch_name)
DO UPDATE SET last_offset = :sisteProsesserteVersjon
""".trimIndent(), mapOf("sisteProsesserteVersjon" to sisteProsesserteVersjon, "batchName" to batch.name))
}

open fun hentSisteProsseserteVersjon(batch: BatchJob): Long {
val results = template.query("""
SELECT last_offset FROM batch_tracking where batch_name = :batchName
""".trimIndent(), mapOf("batchName" to batch.name)
) { row, _ -> row.getLong("last_offset") }
if (results.isEmpty()) {
log.warn("Could not find last_offset for batch: ${batch.name} ")
return 0
}
return results.first()
}

public fun withOffset(batch: BatchJob, processBatch: (sisteProsesserteVersjon: Long) -> List<BatchResult>): List<BatchResult> {
val siste = hentSisteProsseserteVersjon(batch)
val batchResults = processBatch(siste)
setSisteProsesserteVersjon(batch, batchResults.sisteProsesserteVersjon(siste))
return batchResults
}
}

sealed class BatchResult(val versjon: Long) {
class Success(versjon: Long): BatchResult(versjon)
class Failure(versjon: Long): BatchResult(versjon)
}

fun List<BatchResult>.sisteProsesserteVersjon(fallbackOffset: Long): Long {
val førsteFeiledeVersjon = this
.filterIsInstance<BatchResult.Failure>()
.minByOrNull { it.versjon }
?.versjon
return (førsteFeiledeVersjon?.minus(1)
?: this.filterIsInstance<BatchResult.Success>().maxByOrNull { it.versjon }?.versjon)
?: fallbackOffset
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package no.nav.veilarbaktivitet.stilling_fra_nav

import io.micrometer.core.annotation.Timed
import lombok.RequiredArgsConstructor
import no.nav.veilarbaktivitet.aktivitet.domain.AktivitetData
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service

@Service
@RequiredArgsConstructor
open class DelingAvCvManueltAvbruttService(
private val delingAvCvService: DelingAvCvService,
private val delingAvCvDAO: DelingAvCvDAO,
private val batchTrackingDao: BatchTrackingDAO
) {
private val log = LoggerFactory.getLogger(javaClass)

@Timed(value = "stillingFraNavAvbruttEllerFullfortUtenSvar", histogram = true)
open fun notifiserFullfortEllerAvbruttUtenSvar(maxantall: Int): Int {
return batchTrackingDao.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { sisteProsesserteVersjonFørBatch ->
val aktivitetData = delingAvCvDAO.hentStillingFraNavSomErFullfortEllerAvbruttUtenSvar(maxantall.toLong(), sisteProsesserteVersjonFørBatch)
aktivitetData.map { aktivitet: AktivitetData ->
try {
delingAvCvService.notifiserAvbruttEllerFullfortUtenSvar(aktivitet)
return@map BatchResult.Success(aktivitet.versjon)
} catch (e: Exception) {
log.warn("Behandling av fullført/avbrutt aktivitet aktivitetId=${aktivitet.id} feilet")
log.error("Kunne ikke behandle avbrutt/fullført aktivitet", e)
return@map BatchResult.Failure(aktivitet.versjon)
}
}
}.size
}
}
5 changes: 5 additions & 0 deletions src/main/resources/db/migration/V2__batch_tracking.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS batch_tracking(
batch_name varchar(60) primary key,
last_offset bigint
)

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.getunleash.Unleash;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
import no.nav.common.client.aktoroppslag.AktorOppslagClient;
import no.nav.common.types.identer.NavIdent;
import no.nav.poao.dab.spring_auth.AuthService;
Expand Down Expand Up @@ -40,7 +39,6 @@
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.web.server.ResponseStatusException;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.*;

Expand Down Expand Up @@ -85,17 +83,6 @@ class ArenaControllerTest {

private final ForhaandsorienteringDTO forhaandsorientering = ForhaandsorienteringDTO.builder().type(Type.SEND_FORHAANDSORIENTERING).tekst("kake").build();

ArenaControllerTest() throws IOException {
}

static EmbeddedPostgres setUpDatabase() {
try {
return EmbeddedPostgres.start();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@BeforeEach
void cleanup() {
doThrow(new ResponseStatusException(HttpStatus.FORBIDDEN))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package no.nav.veilarbaktivitet.stilling_fra_nav

import no.nav.veilarbaktivitet.LocalDatabaseSingleton.postgres
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate
import javax.sql.DataSource

class BatchTrackingDAOTest {

private val db: DataSource = postgres
private val namedParameterJdbcTemplate = NamedParameterJdbcTemplate(db)
private val batchTrackingDAO = BatchTrackingDAO(namedParameterJdbcTemplate)

@BeforeEach
fun setup() {
batchTrackingDAO.setSisteProsesserteVersjon(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar, 0)
}

@Test
fun should_not_increment_offset_if_nothing_is_processsed() {
batchTrackingDAO.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { offset ->
assertThat(offset).isEqualTo(0)
emptyList()
}
batchTrackingDAO.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { offset ->
assertThat(offset).isEqualTo(0)
emptyList()
}
}

@Test
fun should_set_offset_to_highest_success_result_processsed() {
batchTrackingDAO.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { offset ->
assertThat(offset).isEqualTo(0)
listOf(BatchResult.Success(10), BatchResult.Success(20))
}
batchTrackingDAO.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { offset ->
assertThat(offset).isEqualTo(20)
emptyList()
}
}


@Test
fun should_set_offset_to_first_failure_result_processsed() {
batchTrackingDAO.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { offset ->
assertThat(offset).isEqualTo(0)
listOf(BatchResult.Failure(10), BatchResult.Failure(2), BatchResult.Success(20))
}
batchTrackingDAO.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { offset ->
assertThat(offset).isEqualTo(1)
emptyList()
}
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package no.nav.veilarbaktivitet.aktivitetskort

import io.zonky.test.db.postgres.junit.EmbeddedPostgresRules
import io.zonky.test.db.postgres.junit.SingleInstancePostgresRule
import lombok.SneakyThrows
import no.nav.veilarbaktivitet.LocalDatabaseSingleton
import no.nav.veilarbaktivitet.aktivitet.AktivitetDAO
import no.nav.veilarbaktivitet.testutils.AktivitetDataTestBuilder
import org.assertj.core.api.Assertions.assertThat
import org.junit.Rule
import org.junit.jupiter.api.Test
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate

Expand Down

0 comments on commit 435a6d2

Please sign in to comment.