diff --git a/src/main/java/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvDAO.java b/src/main/java/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvDAO.java index a1b1ed61b..573c82ab3 100644 --- a/src/main/java/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvDAO.java +++ b/src/main/java/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvDAO.java @@ -42,8 +42,9 @@ public List hentStillingFraNavUtenSvarDerFristErUtlopt(long maxAn new AktivitetDataRowMapper()); } - public List hentStillingFraNavSomErFullfortEllerAvbruttUtenSvar(long maxAntall) { - SqlParameterSource parameter = new MapSqlParameterSource("maxAntall", maxAntall); + public List hentStillingFraNavSomErFullfortEllerAvbruttUtenSvar(long maxAntall, long sisteProsesserteVersjon) { + SqlParameterSource parameter = new MapSqlParameterSource("maxAntall", maxAntall) + .addValue("sisteProsesserteVersjon", sisteProsesserteVersjon); return jdbcTemplate.query(""" SELECT SFN.ARBEIDSGIVER as "STILLING_FRA_NAV.ARBEIDSGIVER", SFN.ARBEIDSSTED as "STILLING_FRA_NAV.ARBEIDSSTED", SFN.DETALJER AS "STILLING_FRA_NAV.DETALJER", @@ -56,8 +57,9 @@ AND LIVSLOPSTATUS_KODE IN('AVBRUTT','FULLFORT') AND HISTORISK_DATO is null AND SFN.LIVSLOPSSTATUS NOT IN('AVBRUTT_AV_BRUKER', 'AVBRUTT_AV_SYSTEM', 'HAR_SVART') AND SFN.CV_KAN_DELES IS NULL - order by A.AKTIVITET_ID - fetch first :maxAntall rows only + AND A.versjon > :sisteProsesserteVersjon + order by A.versjon + limit :maxAntall """, parameter, new AktivitetDataRowMapper()); diff --git a/src/main/java/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvManueltAvbruttService.java b/src/main/java/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvManueltAvbruttService.java deleted file mode 100644 index 5947997d3..000000000 --- a/src/main/java/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvManueltAvbruttService.java +++ /dev/null @@ -1,32 +0,0 @@ -package no.nav.veilarbaktivitet.stilling_fra_nav; - -import io.micrometer.core.annotation.Timed; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import no.nav.veilarbaktivitet.aktivitet.domain.AktivitetData; -import org.springframework.stereotype.Service; - -import java.util.List; - -@Slf4j -@Service -@RequiredArgsConstructor -public class DelingAvCvManueltAvbruttService { - - private final DelingAvCvService delingAvCvService; - private final DelingAvCvDAO delingAvCvDAO; - - @Timed(value = "stillingFraNavAvbruttEllerFullfortUtenSvar", histogram = true) - public int notifiserFullfortEllerAvbruttUtenSvar(int maxantall) { - List aktivitetData = delingAvCvDAO.hentStillingFraNavSomErFullfortEllerAvbruttUtenSvar(maxantall); - aktivitetData.forEach(aktivitet -> { - try { - delingAvCvService.notifiserAvbruttEllerFullfortUtenSvar(aktivitet); - } catch (Exception e) { - log.warn("Behandling av fullført/avbrutt aktivitet aktivitetId={} feilet", aktivitet.getId()); - log.error("Kunne ikke behandle avbrutt/fullført aktivitet", e); - } - }); - return aktivitetData.size(); - } -} diff --git a/src/main/java/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvService.java b/src/main/java/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvService.java index 87c73fbf8..7928eb38b 100644 --- a/src/main/java/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvService.java +++ b/src/main/java/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvService.java @@ -69,6 +69,7 @@ public void notifiserAvbruttEllerFullfortUtenSvar(AktivitetData aktivitet) { var endretAv = Person.systemUser(); AktivitetData nyAktivitet = aktivitet.toBuilder() .endretAv(endretAv.get()) + .endretDato(new Date()) .endretAvType(endretAv.tilInnsenderType()) .stillingFraNavData(aktivitet.getStillingFraNavData().withLivslopsStatus(LivslopsStatus.AVBRUTT_AV_BRUKER)) .build(); diff --git a/src/main/kotlin/no/nav/veilarbaktivitet/stilling_fra_nav/BatchTrackingDAO.kt b/src/main/kotlin/no/nav/veilarbaktivitet/stilling_fra_nav/BatchTrackingDAO.kt new file mode 100644 index 000000000..f52867ebc --- /dev/null +++ b/src/main/kotlin/no/nav/veilarbaktivitet/stilling_fra_nav/BatchTrackingDAO.kt @@ -0,0 +1,58 @@ +package no.nav.veilarbaktivitet.stilling_fra_nav + +import org.slf4j.LoggerFactory +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import org.springframework.stereotype.Repository + +enum class BatchJob { + Deling_av_cv_avbrutt_eller_fuulfort_uten_svar +} + +@Repository +open class BatchTrackingDAO( + val template: NamedParameterJdbcTemplate +) { + private val log = LoggerFactory.getLogger(javaClass) + + open fun setSisteProsesserteVersjon(batch: BatchJob, sisteProsesserteVersjon: Long) { + template.update(""" + INSERT INTO batch_tracking (batch_name, last_offset) VALUES (:batchName, :sisteProsesserteVersjon) + ON CONFLICT(batch_name) + DO UPDATE SET last_offset = :sisteProsesserteVersjon + """.trimIndent(), mapOf("sisteProsesserteVersjon" to sisteProsesserteVersjon, "batchName" to batch.name)) + } + + open fun hentSisteProsseserteVersjon(batch: BatchJob): Long { + val results = template.query(""" + SELECT last_offset FROM batch_tracking where batch_name = :batchName + """.trimIndent(), mapOf("batchName" to batch.name) + ) { row, _ -> row.getLong("last_offset") } + if (results.isEmpty()) { + log.warn("Could not find last_offset for batch: ${batch.name} ") + return 0 + } + return results.first() + } + + public fun withOffset(batch: BatchJob, processBatch: (sisteProsesserteVersjon: Long) -> List): List { + val siste = hentSisteProsseserteVersjon(batch) + val batchResults = processBatch(siste) + setSisteProsesserteVersjon(batch, batchResults.sisteProsesserteVersjon(siste)) + return batchResults + } +} + +sealed class BatchResult(val versjon: Long) { + class Success(versjon: Long): BatchResult(versjon) + class Failure(versjon: Long): BatchResult(versjon) +} + +fun List.sisteProsesserteVersjon(fallbackOffset: Long): Long { + val førsteFeiledeVersjon = this + .filterIsInstance() + .minByOrNull { it.versjon } + ?.versjon + return (førsteFeiledeVersjon?.minus(1) + ?: this.filterIsInstance().maxByOrNull { it.versjon }?.versjon) + ?: fallbackOffset +} diff --git a/src/main/kotlin/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvManueltAvbruttService.kt b/src/main/kotlin/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvManueltAvbruttService.kt new file mode 100644 index 000000000..0125fbc21 --- /dev/null +++ b/src/main/kotlin/no/nav/veilarbaktivitet/stilling_fra_nav/DelingAvCvManueltAvbruttService.kt @@ -0,0 +1,34 @@ +package no.nav.veilarbaktivitet.stilling_fra_nav + +import io.micrometer.core.annotation.Timed +import lombok.RequiredArgsConstructor +import no.nav.veilarbaktivitet.aktivitet.domain.AktivitetData +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Service + +@Service +@RequiredArgsConstructor +open class DelingAvCvManueltAvbruttService( + private val delingAvCvService: DelingAvCvService, + private val delingAvCvDAO: DelingAvCvDAO, + private val batchTrackingDao: BatchTrackingDAO +) { + private val log = LoggerFactory.getLogger(javaClass) + + @Timed(value = "stillingFraNavAvbruttEllerFullfortUtenSvar", histogram = true) + open fun notifiserFullfortEllerAvbruttUtenSvar(maxantall: Int): Int { + return batchTrackingDao.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { sisteProsesserteVersjonFørBatch -> + val aktivitetData = delingAvCvDAO.hentStillingFraNavSomErFullfortEllerAvbruttUtenSvar(maxantall.toLong(), sisteProsesserteVersjonFørBatch) + aktivitetData.map { aktivitet: AktivitetData -> + try { + delingAvCvService.notifiserAvbruttEllerFullfortUtenSvar(aktivitet) + return@map BatchResult.Success(aktivitet.versjon) + } catch (e: Exception) { + log.warn("Behandling av fullført/avbrutt aktivitet aktivitetId=${aktivitet.id} feilet") + log.error("Kunne ikke behandle avbrutt/fullført aktivitet", e) + return@map BatchResult.Failure(aktivitet.versjon) + } + } + }.size + } +} diff --git a/src/main/resources/db/migration/V2__batch_tracking.sql b/src/main/resources/db/migration/V2__batch_tracking.sql new file mode 100644 index 000000000..c7f8a361b --- /dev/null +++ b/src/main/resources/db/migration/V2__batch_tracking.sql @@ -0,0 +1,5 @@ +CREATE TABLE IF NOT EXISTS batch_tracking( + batch_name varchar(60) primary key, + last_offset bigint +) + diff --git a/src/test/java/no/nav/veilarbaktivitet/arena/ArenaControllerTest.java b/src/test/java/no/nav/veilarbaktivitet/arena/ArenaControllerTest.java index 917d22f8a..1094ac44b 100644 --- a/src/test/java/no/nav/veilarbaktivitet/arena/ArenaControllerTest.java +++ b/src/test/java/no/nav/veilarbaktivitet/arena/ArenaControllerTest.java @@ -3,7 +3,6 @@ import io.getunleash.Unleash; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; import no.nav.common.client.aktoroppslag.AktorOppslagClient; import no.nav.common.types.identer.NavIdent; import no.nav.poao.dab.spring_auth.AuthService; @@ -40,7 +39,6 @@ import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; import org.springframework.web.server.ResponseStatusException; -import java.io.IOException; import java.time.ZonedDateTime; import java.util.*; @@ -85,17 +83,6 @@ class ArenaControllerTest { private final ForhaandsorienteringDTO forhaandsorientering = ForhaandsorienteringDTO.builder().type(Type.SEND_FORHAANDSORIENTERING).tekst("kake").build(); - ArenaControllerTest() throws IOException { - } - - static EmbeddedPostgres setUpDatabase() { - try { - return EmbeddedPostgres.start(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - @BeforeEach void cleanup() { doThrow(new ResponseStatusException(HttpStatus.FORBIDDEN)) diff --git a/src/test/java/no/nav/veilarbaktivitet/stilling_fra_nav/BatchTrackingDAOTest.kt b/src/test/java/no/nav/veilarbaktivitet/stilling_fra_nav/BatchTrackingDAOTest.kt new file mode 100644 index 000000000..1edb67abd --- /dev/null +++ b/src/test/java/no/nav/veilarbaktivitet/stilling_fra_nav/BatchTrackingDAOTest.kt @@ -0,0 +1,58 @@ +package no.nav.veilarbaktivitet.stilling_fra_nav + +import no.nav.veilarbaktivitet.LocalDatabaseSingleton.postgres +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate +import javax.sql.DataSource + +class BatchTrackingDAOTest { + + private val db: DataSource = postgres + private val namedParameterJdbcTemplate = NamedParameterJdbcTemplate(db) + private val batchTrackingDAO = BatchTrackingDAO(namedParameterJdbcTemplate) + + @BeforeEach + fun setup() { + batchTrackingDAO.setSisteProsesserteVersjon(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar, 0) + } + + @Test + fun should_not_increment_offset_if_nothing_is_processsed() { + batchTrackingDAO.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { offset -> + assertThat(offset).isEqualTo(0) + emptyList() + } + batchTrackingDAO.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { offset -> + assertThat(offset).isEqualTo(0) + emptyList() + } + } + + @Test + fun should_set_offset_to_highest_success_result_processsed() { + batchTrackingDAO.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { offset -> + assertThat(offset).isEqualTo(0) + listOf(BatchResult.Success(10), BatchResult.Success(20)) + } + batchTrackingDAO.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { offset -> + assertThat(offset).isEqualTo(20) + emptyList() + } + } + + + @Test + fun should_set_offset_to_first_failure_result_processsed() { + batchTrackingDAO.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { offset -> + assertThat(offset).isEqualTo(0) + listOf(BatchResult.Failure(10), BatchResult.Failure(2), BatchResult.Success(20)) + } + batchTrackingDAO.withOffset(BatchJob.Deling_av_cv_avbrutt_eller_fuulfort_uten_svar) { offset -> + assertThat(offset).isEqualTo(1) + emptyList() + } + } + +} \ No newline at end of file diff --git a/src/test/kotlin/no/nav/veilarbaktivitet/aktivitetskort/EksternaktivitetDAOTest.kt b/src/test/kotlin/no/nav/veilarbaktivitet/aktivitetskort/EksternaktivitetDAOTest.kt index d9fd8ab6d..0376374db 100644 --- a/src/test/kotlin/no/nav/veilarbaktivitet/aktivitetskort/EksternaktivitetDAOTest.kt +++ b/src/test/kotlin/no/nav/veilarbaktivitet/aktivitetskort/EksternaktivitetDAOTest.kt @@ -1,13 +1,10 @@ package no.nav.veilarbaktivitet.aktivitetskort -import io.zonky.test.db.postgres.junit.EmbeddedPostgresRules -import io.zonky.test.db.postgres.junit.SingleInstancePostgresRule import lombok.SneakyThrows import no.nav.veilarbaktivitet.LocalDatabaseSingleton import no.nav.veilarbaktivitet.aktivitet.AktivitetDAO import no.nav.veilarbaktivitet.testutils.AktivitetDataTestBuilder import org.assertj.core.api.Assertions.assertThat -import org.junit.Rule import org.junit.jupiter.api.Test import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate