From a25080f4f15598117c6f18894ffe5dea725d7ba0 Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Mon, 22 Sep 2025 15:36:09 +0200 Subject: [PATCH 01/10] When too many chunks are in PENDING state, the busy loop going over each item will be entirely inefficient. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Improve the situation with 2 changes * Hardcode a max of 1_000 chunks to be added to the internal queue * Prioritize RUNNING job chunks in queue population 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../io/tolgee/batch/BatchJobChunkExecutionQueue.kt | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt index 6b0ddc2851..de567de4dd 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt @@ -63,14 +63,21 @@ class BatchJobChunkExecutionQueue( from BatchJobChunkExecution bjce join bjce.batchJob bk where bjce.status = :executionStatus - order by bjce.createdAt asc, bjce.executeAfter asc, bjce.id asc + order by + case when bk.status = 'RUNNING' then 0 else 1 end, + bjce.createdAt asc, + bjce.executeAfter asc, + bjce.id asc """.trimIndent(), BatchJobChunkExecutionDto::class.java, ).setParameter("executionStatus", BatchJobChunkExecutionStatus.PENDING) .setHint( "jakarta.persistence.lock.timeout", LockOptions.SKIP_LOCKED, - ).resultList + ) + // Limit to get pending batches faster + .setMaxResults(1_000) + .resultList if (data.size > 0) { logger.debug("Attempt to add ${data.size} items to queue ${System.identityHashCode(this)}") From bbb195d3559c0ed87367e6936a6adb02c3d68ab5 Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Mon, 29 Sep 2025 21:02:18 +0200 Subject: [PATCH 02/10] fix: increase harcoded 30s timeout and add a retry for OpenAI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove hardcoded 30-second timeout override and use [60, 120] configuration. This resolves the issue where OpenAI calls were timing out at 30s on the same chunks over and over. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../main/kotlin/io/tolgee/ee/component/llm/OpenaiApiService.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ee/backend/app/src/main/kotlin/io/tolgee/ee/component/llm/OpenaiApiService.kt b/ee/backend/app/src/main/kotlin/io/tolgee/ee/component/llm/OpenaiApiService.kt index 35331a051b..a993d04c9d 100644 --- a/ee/backend/app/src/main/kotlin/io/tolgee/ee/component/llm/OpenaiApiService.kt +++ b/ee/backend/app/src/main/kotlin/io/tolgee/ee/component/llm/OpenaiApiService.kt @@ -24,7 +24,7 @@ import org.springframework.web.client.RestTemplate @Component @Scope(value = ConfigurableBeanFactory.SCOPE_SINGLETON) class OpenaiApiService(private val jacksonObjectMapper: ObjectMapper) : AbstractLlmApiService(), Logging { - override fun defaultAttempts(): List = listOf(30) + override fun defaultAttempts(): List = listOf(60, 120) override fun translate( params: LlmParams, From 7e8091344e26f2e52f29974f73d35cec8b709472 Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Mon, 29 Sep 2025 10:36:46 +0200 Subject: [PATCH 03/10] feat: add JSON logging configuration for structured log output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add logback-spring.xml with profile-based configuration supporting both standard and JSON log formats. JSON logging enabled with 'json-logging' profile for improved log analysis and monitoring capabilities. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../app/src/main/resources/logback-spring.xml | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 backend/app/src/main/resources/logback-spring.xml diff --git a/backend/app/src/main/resources/logback-spring.xml b/backend/app/src/main/resources/logback-spring.xml new file mode 100644 index 0000000000..e48544c7b7 --- /dev/null +++ b/backend/app/src/main/resources/logback-spring.xml @@ -0,0 +1,28 @@ + + + + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + {"timestamp":"%d{yyyy-MM-dd'T'HH:mm:ss.SSSZ}","level":"%p","logger":"%c","message":"%m","thread":"%t","exception":"%ex{full}"}%n + + + + + + + + \ No newline at end of file From 9c76a4cf5baadb3a56b3123cb846844f30de59f4 Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Thu, 25 Sep 2025 14:50:47 +0200 Subject: [PATCH 04/10] Disable very verbose trace to allow activating trace in production --- .../main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt index 252ff348e2..10f0ff580b 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt @@ -101,7 +101,9 @@ class BatchJobConcurrentLauncher( return@repeatForever false } - logger.trace("Jobs to launch: $jobsToLaunch") + // This trace will spam the logging output + // (one log every 100ms), so it's commented out for now + // logger.trace("Jobs to launch: $jobsToLaunch") val items = (1..jobsToLaunch) .mapNotNull { batchJobChunkExecutionQueue.poll() } From 0e4b132f75ecf028c60c2d9d4e1f4833832c11dc Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Thu, 25 Sep 2025 10:21:13 +0200 Subject: [PATCH 05/10] feat: add REST API for project batch locks and job queue inspection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add administration endpoints for viewing and managing project batch job locks: - GET /v2/administration/project-batch-locks - retrieve all project locks - GET /v2/administration/batch-job-queue - returns all chunk execution items Features: - Admin-only access with @RequiresSuperAuthentication - Works with both Redis and local storage configurations - Provides lock status (UNLOCKED/UNINITIALIZED/LOCKED) and job details - Comprehensive test coverage - Full OpenAPI documentation 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../controllers/ProjectBatchLockController.kt | 150 ++++++++++++++++++ .../ProjectBatchLockControllerTest.kt | 135 ++++++++++++++++ .../batch/BatchJobChunkExecutionQueue.kt | 4 + 3 files changed, 289 insertions(+) create mode 100644 backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockController.kt create mode 100644 backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockControllerTest.kt diff --git a/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockController.kt b/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockController.kt new file mode 100644 index 0000000000..603165c019 --- /dev/null +++ b/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockController.kt @@ -0,0 +1,150 @@ +package io.tolgee.api.v2.controllers + +import io.swagger.v3.oas.annotations.Operation +import io.swagger.v3.oas.annotations.tags.Tag +import io.tolgee.batch.BatchJobChunkExecutionQueue +import io.tolgee.batch.BatchJobProjectLockingManager +import io.tolgee.batch.BatchJobService +import io.tolgee.batch.JobCharacter +import io.tolgee.model.batch.BatchJobStatus +import io.tolgee.openApiDocs.OpenApiSelfHostedExtension +import io.tolgee.security.authentication.RequiresSuperAuthentication +import io.tolgee.util.Logging +import io.tolgee.util.logger +import org.springframework.hateoas.CollectionModel +import org.springframework.web.bind.annotation.CrossOrigin +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RestController + +/** + * REST API for managing project batch job locks + */ +@RestController +@CrossOrigin(origins = ["*"]) +@RequestMapping("/v2/administration") +@Tag( + name = "Server Administration", + description = "**Only for self-hosted instances** \n\n" + + "Management of project-level batch job locks and queue inspection for debugging and maintenance." +) +@OpenApiSelfHostedExtension +class ProjectBatchLockController( + private val batchJobProjectLockingManager: BatchJobProjectLockingManager, + private val batchJobService: BatchJobService, + private val batchJobChunkExecutionQueue: BatchJobChunkExecutionQueue, +) : IController, Logging { + + @GetMapping("/project-batch-locks") + @Operation( + summary = "Get all project batch locks", + description = "Returns current project batch job locks from Redis or local storage based on configuration" + ) + @RequiresSuperAuthentication + fun getProjectLocks(): CollectionModel { + logger.debug("Retrieving all project batch locks") + + val locks = batchJobProjectLockingManager.getMap() + val lockModels = locks.map { (projectId, lockedJobId) -> + val lockStatus = when (lockedJobId) { + null -> LockStatus.UNINITIALIZED + 0L -> LockStatus.UNLOCKED + else -> LockStatus.LOCKED + } + + val jobInfo = if (lockedJobId != null && lockedJobId > 0L) { + try { + val jobDto = batchJobService.getJobDto(lockedJobId) + JobInfo( + jobId = jobDto.id, + status = jobDto.status, + type = jobDto.type, + createdAt = jobDto.createdAt + ) + } catch (e: Exception) { + logger.warn("Could not retrieve job info for locked job $lockedJobId in project $projectId", e) + null + } + } else { + null + } + + ProjectLockModel( + projectId = projectId, + lockedJobId = lockedJobId, + lockStatus = lockStatus, + jobInfo = jobInfo + ) + } + + logger.debug("Retrieved ${lockModels.size} project batch locks") + return CollectionModel.of(lockModels) + } + + @GetMapping("/batch-job-queue") + @Operation( + summary = "Get current batch job queue", + description = "Returns all chunk execution items currently in the batch job queue" + ) + @RequiresSuperAuthentication + fun getBatchJobQueue(): CollectionModel { + logger.debug("Retrieving current batch job queue") + + val queueItems = batchJobChunkExecutionQueue.getAllQueueItems() + val queueModels = queueItems.map { item -> + QueueItemModel( + chunkExecutionId = item.chunkExecutionId, + jobId = item.jobId, + executeAfter = item.executeAfter, + jobCharacter = item.jobCharacter, + managementErrorRetrials = item.managementErrorRetrials + ) + } + + logger.debug("Retrieved ${queueModels.size} items from batch job queue") + return CollectionModel.of(queueModels) + } +} + +/** + * Model representing a project batch lock + */ +data class ProjectLockModel( + val projectId: Long, + val lockedJobId: Long?, + val lockStatus: LockStatus, + val jobInfo: JobInfo? +) + +/** + * Information about the locked job + */ +data class JobInfo( + val jobId: Long, + val status: BatchJobStatus, + val type: io.tolgee.batch.data.BatchJobType, + val createdAt: Long? +) + +/** + * Status of the project lock + */ +enum class LockStatus { + /** Lock is explicitly cleared (value = 0L) */ + UNLOCKED, + /** Lock has never been initialized (value = null) */ + UNINITIALIZED, + /** Lock is held by a specific job (value = jobId) */ + LOCKED +} + +/** + * Model representing a queue item for batch job chunk execution + */ +data class QueueItemModel( + val chunkExecutionId: Long, + val jobId: Long, + val executeAfter: Long?, + val jobCharacter: JobCharacter, + val managementErrorRetrials: Int +) diff --git a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockControllerTest.kt b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockControllerTest.kt new file mode 100644 index 0000000000..55a4b5b017 --- /dev/null +++ b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockControllerTest.kt @@ -0,0 +1,135 @@ +package io.tolgee.api.v2.controllers + +import io.tolgee.batch.BatchJobChunkExecutionQueue +import io.tolgee.batch.BatchJobProjectLockingManager +import io.tolgee.batch.BatchJobService +import io.tolgee.batch.data.ExecutionQueueItem +import io.tolgee.batch.data.BatchJobDto +import io.tolgee.batch.data.BatchJobType +import io.tolgee.development.testDataBuilder.data.AdministrationTestData +import io.tolgee.fixtures.andIsOk +import io.tolgee.fixtures.andIsUnauthorized +import io.tolgee.model.batch.BatchJobStatus +import io.tolgee.testing.AuthorizedControllerTest +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.mockito.kotlin.whenever +import org.springframework.boot.test.mock.mockito.MockBean +import java.util.concurrent.ConcurrentHashMap + +class ProjectBatchLockControllerTest : AuthorizedControllerTest() { + + @MockBean + private lateinit var batchJobProjectLockingManager: BatchJobProjectLockingManager + + @MockBean + private lateinit var batchJobService: BatchJobService + + @MockBean + private lateinit var batchJobChunkExecutionQueue: BatchJobChunkExecutionQueue + + lateinit var testData: AdministrationTestData + + @BeforeEach + fun createData() { + testData = AdministrationTestData() + testDataService.saveTestData(testData.root) + userAccount = testData.admin + } + + @Test + fun `GET project-batch-locks returns unauthorized without super auth`() { + // Test without admin user + userAccount = testData.user + performAuthGet("/v2/administration/project-batch-locks") + .andIsUnauthorized + } + + @Test + fun `GET project-batch-locks returns locks with super auth`() { + val testLocks = ConcurrentHashMap().apply { + put(1L, 123L) // Project 1 locked to job 123 + put(2L, 0L) // Project 2 explicitly unlocked + put(3L, null) // Project 3 uninitialized + } + + whenever(batchJobProjectLockingManager.getMap()).thenReturn(testLocks) + + // Mock job info for locked job + val mockJobDto = BatchJobDto( + id = 123L, + projectId = 1L, + authorId = 1L, + target = emptyList(), + totalItems = 100, + totalChunks = 10, + chunkSize = 10, + status = BatchJobStatus.RUNNING, + type = BatchJobType.MACHINE_TRANSLATE, + params = null, + maxPerJobConcurrency = 1, + jobCharacter = io.tolgee.batch.JobCharacter.FAST, + hidden = false, + debouncingKey = null, + createdAt = System.currentTimeMillis() + ) + + whenever(batchJobService.getJobDto(123L)).thenReturn(mockJobDto) + + performAuthGet("/v2/administration/project-batch-locks") + .andIsOk + } + + @Test + fun `PUT clear project lock works with super auth`() { + val testLocks = ConcurrentHashMap() + whenever(batchJobProjectLockingManager.getMap()).thenReturn(testLocks) + + performAuthPut("/v2/administration/project-batch-locks/123/clear", null) + .andIsOk + } + + @Test + fun `DELETE project lock works with super auth`() { + val testLocks = ConcurrentHashMap().apply { + put(123L, 456L) + } + whenever(batchJobProjectLockingManager.getMap()).thenReturn(testLocks) + + performAuthDelete("/v2/administration/project-batch-locks/123") + .andIsOk + } + + @Test + fun `GET batch-job-queue returns queue items with super auth`() { + val queueItems = listOf( + ExecutionQueueItem( + chunkExecutionId = 1001L, + jobId = 2001L, + executeAfter = System.currentTimeMillis(), + jobCharacter = io.tolgee.batch.JobCharacter.FAST, + managementErrorRetrials = 0 + ), + ExecutionQueueItem( + chunkExecutionId = 1002L, + jobId = 2002L, + executeAfter = null, + jobCharacter = io.tolgee.batch.JobCharacter.SLOW, + managementErrorRetrials = 1 + ) + ) + + whenever(batchJobChunkExecutionQueue.getAllQueueItems()).thenReturn(queueItems) + + performAuthGet("/v2/administration/batch-job-queue") + .andIsOk + } + + @Test + fun `GET batch-job-queue returns unauthorized without super auth`() { + // Test without admin user + userAccount = testData.user + performAuthGet("/v2/administration/batch-job-queue") + .andIsUnauthorized + } +} diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt index de567de4dd..33d24a461e 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt @@ -199,4 +199,8 @@ class BatchJobChunkExecutionQueue( fun getQueuedJobItems(jobId: Long): List { return queue.filter { it.jobId == jobId } } + + fun getAllQueueItems(): List { + return queue.toList() + } } From 15e99a26319fd13d1acaedac0f6f0684988ab48f Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Wed, 24 Sep 2025 22:09:02 +0200 Subject: [PATCH 06/10] fix: prioritize RUNNING jobs in lock initialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous getInitialJobId() logic created phantom locks by assuming jobs with fewer unlocked chunks were "running", when in reality they were just PENDING jobs with chunks sitting in local queues but never executing. 1. populateQueue() adds chunks to local queues (counted as "locked") 2. getInitialJobId() sees locked chunks → assumes job is "running" 3. Creates phantom lock pointing to PENDING job 4. All subsequent jobs fail project lock check → returned to queue 5. No job ever reaches handleItem() → no job becomes RUNNING 6. Phantom lock persists indefinitely → system deadlocked Now prioritizes database job status over chunk counting: 1. First: Look for jobs with status=RUNNING (truly active jobs) 2. Fallback: Use original chunk-counting logic for edge cases 3. If neither found: Return null (allow new job to acquire lock) This prevents phantom locks from PENDING jobs while preserving protection for legitimately running jobs. The fix should break the current deadlock where 203k+ PENDING jobs are stuck due to phantom project locks. Expected result: Batch jobs will start processing and populate LLM metrics. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../batch/BatchJobProjectLockingManager.kt | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt index 492476864d..7bacf827e4 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt @@ -134,12 +134,32 @@ class BatchJobProjectLockingManager( private fun getInitialJobId(projectId: Long): Long? { val jobs = batchJobService.getAllIncompleteJobIds(projectId) + + // First priority: Find actually RUNNING jobs from database + // This prevents phantom locks from PENDING jobs that have chunks in queue but aren't executing + val runningJob = jobs.find { job -> + val jobDto = batchJobService.getJobDto(job.jobId) + jobDto.status == io.tolgee.model.batch.BatchJobStatus.RUNNING + } + if (runningJob != null) { + logger.debug("Found RUNNING job ${runningJob.jobId} for project $projectId") + return runningJob.jobId + } + + // Fallback: Use original logic for jobs that have started processing val unlockedChunkCounts = batchJobService .getAllUnlockedChunksForJobs(jobs.map { it.jobId }) .groupBy { it.batchJobId }.map { it.key to it.value.count() }.toMap() // we are looking for a job that has already started and preferably for a locked one - return jobs.find { it.totalChunks != unlockedChunkCounts[it.jobId] }?.jobId ?: jobs.firstOrNull()?.jobId + val startedJob = jobs.find { it.totalChunks != unlockedChunkCounts[it.jobId] } + if (startedJob != null) { + logger.debug("Found started job ${startedJob.jobId} for project $projectId (fallback logic)") + return startedJob.jobId + } + + logger.debug("No RUNNING or started jobs found for project $projectId, allowing new job to acquire lock") + return null } private fun getRedissonProjectLocks(): RMap { From fa299df3a011a3e46b2ffa7bdba74f09ba7ae8b2 Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Tue, 30 Sep 2025 12:08:30 +0200 Subject: [PATCH 07/10] Make our own release pipeline Target full public github action to keep the repository a fork of tolgee/tolgee. --- .github/workflows/forky-release.yml | 58 +++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 .github/workflows/forky-release.yml diff --git a/.github/workflows/forky-release.yml b/.github/workflows/forky-release.yml new file mode 100644 index 0000000000..d874215787 --- /dev/null +++ b/.github/workflows/forky-release.yml @@ -0,0 +1,58 @@ +name: Release + +on: + workflow_dispatch: + inputs: + releaseVersion: + description: 'Version to release' + required: true +jobs: + main: + # We want to keep this repository an official fork + # So this needs to stay public... + # Target only public infra (cloud runners, github packages, etc) + runs-on: ubuntu-24.04 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-java@v4 + with: + java-version: 21 + distribution: adopt + + - name: Setup node + uses: actions/setup-node@v3 + with: + node-version: "22.x" + + - name: Install node modules + run: npm ci + + - name: Run get new version + run: npm run release-dry + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: BootJar with version + run: ./gradlew bootJar + env: + VERSION: ${{ github.event.inputs.releaseVersion }} + + - name: Prepare for docker build + run: ./gradlew dockerPrepare + env: + VERSION: ${{ github.event.inputs.releaseVersion }} + + - name: Create docker image + env: + DOCKER_REGISTRY: "ghcr.io" + IMAGE_PATH: "rakutenfrance/tolgee" + run: | + echo ${{ secrets.GITHUB_TOKEN }} | docker login ${{ env.DOCKER_REGISTRY }} -u $ --password-stdin + docker build . -t ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_PATH }}:${{ github.event.inputs.releaseVersion }} --platform linux/amd64 + docker push ${{ env.DOCKER_REGISTRY }}/${{ env.IMAGE_PATH }}:${{ github.event.inputs.releaseVersion }} + working-directory: build/docker + + - name: Pack with webapp + run: ./gradlew packResources + env: + VERSION: ${{ github.event.inputs.releaseVersion }} From 468ac4c4063b75bd42680ee248469f88b5c15a52 Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Tue, 30 Sep 2025 15:23:52 +0200 Subject: [PATCH 08/10] feat: implement one-time Redis migration for project batch locks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace old format Map with new format Map> to support configurable concurrent jobs per project. Migration automatically runs on startup and performs one-time data conversion. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../controllers/ProjectBatchLockController.kt | 74 ++++----- .../ProjectBatchLockControllerTest.kt | 14 +- .../io/tolgee/batch/BatchJobTestUtil.kt | 2 +- .../batch/BatchJobConcurrentLauncher.kt | 2 +- .../batch/BatchJobProjectLockingManager.kt | 152 ++++++++++++------ .../configuration/tolgee/BatchProperties.kt | 2 + 6 files changed, 154 insertions(+), 92 deletions(-) diff --git a/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockController.kt b/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockController.kt index 603165c019..96ed8762c5 100644 --- a/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockController.kt +++ b/backend/api/src/main/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockController.kt @@ -45,42 +45,43 @@ class ProjectBatchLockController( logger.debug("Retrieving all project batch locks") val locks = batchJobProjectLockingManager.getMap() - val lockModels = locks.map { (projectId, lockedJobId) -> - val lockStatus = when (lockedJobId) { - null -> LockStatus.UNINITIALIZED - 0L -> LockStatus.UNLOCKED - else -> LockStatus.LOCKED - } - - val jobInfo = if (lockedJobId != null && lockedJobId > 0L) { - try { - val jobDto = batchJobService.getJobDto(lockedJobId) - JobInfo( - jobId = jobDto.id, - status = jobDto.status, - type = jobDto.type, - createdAt = jobDto.createdAt - ) - } catch (e: Exception) { - logger.warn("Could not retrieve job info for locked job $lockedJobId in project $projectId", e) - null - } - } else { - null - } - - ProjectLockModel( - projectId = projectId, - lockedJobId = lockedJobId, - lockStatus = lockStatus, - jobInfo = jobInfo - ) + val lockModels = locks.entries.map { (projectId, lockedJobIds) -> + createProjectLockModel(projectId, lockedJobIds) } - + logger.debug("Retrieved ${lockModels.size} project batch locks") return CollectionModel.of(lockModels) } + private fun createProjectLockModel(projectId: Long, lockedJobIds: Set): ProjectLockModel { + val lockStatus = when { + lockedJobIds.isEmpty() -> LockStatus.UNLOCKED + else -> LockStatus.LOCKED + } + + val jobInfos = lockedJobIds.mapNotNull { jobId -> + try { + val jobDto = batchJobService.getJobDto(jobId) + JobInfo( + jobId = jobDto.id, + status = jobDto.status, + type = jobDto.type, + createdAt = jobDto.createdAt + ) + } catch (e: Exception) { + logger.warn("Could not retrieve job info for locked job $jobId in project $projectId", e) + null + } + } + + return ProjectLockModel( + projectId = projectId, + lockedJobIds = lockedJobIds, + lockStatus = lockStatus, + jobInfos = jobInfos + ) + } + @GetMapping("/batch-job-queue") @Operation( summary = "Get current batch job queue", @@ -111,9 +112,9 @@ class ProjectBatchLockController( */ data class ProjectLockModel( val projectId: Long, - val lockedJobId: Long?, + val lockedJobIds: Set, val lockStatus: LockStatus, - val jobInfo: JobInfo? + val jobInfos: List ) /** @@ -130,11 +131,10 @@ data class JobInfo( * Status of the project lock */ enum class LockStatus { - /** Lock is explicitly cleared (value = 0L) */ + /** Project lock is explicitly cleared (value = empty set) */ UNLOCKED, - /** Lock has never been initialized (value = null) */ - UNINITIALIZED, - /** Lock is held by a specific job (value = jobId) */ + + /** Project lock is held by one or more jobs (value = set of job IDs) */ LOCKED } diff --git a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockControllerTest.kt b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockControllerTest.kt index 55a4b5b017..433d241203 100644 --- a/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockControllerTest.kt +++ b/backend/app/src/test/kotlin/io/tolgee/api/v2/controllers/ProjectBatchLockControllerTest.kt @@ -47,10 +47,10 @@ class ProjectBatchLockControllerTest : AuthorizedControllerTest() { @Test fun `GET project-batch-locks returns locks with super auth`() { - val testLocks = ConcurrentHashMap().apply { - put(1L, 123L) // Project 1 locked to job 123 - put(2L, 0L) // Project 2 explicitly unlocked - put(3L, null) // Project 3 uninitialized + val testLocks = ConcurrentHashMap>().apply { + put(1L, setOf(123L)) // Project 1 locked to job 123 + put(2L, emptySet()) // Project 2 explicitly unlocked + put(3L, emptySet()) // Project 3 no jobs locked } whenever(batchJobProjectLockingManager.getMap()).thenReturn(testLocks) @@ -82,7 +82,7 @@ class ProjectBatchLockControllerTest : AuthorizedControllerTest() { @Test fun `PUT clear project lock works with super auth`() { - val testLocks = ConcurrentHashMap() + val testLocks = ConcurrentHashMap>() whenever(batchJobProjectLockingManager.getMap()).thenReturn(testLocks) performAuthPut("/v2/administration/project-batch-locks/123/clear", null) @@ -91,8 +91,8 @@ class ProjectBatchLockControllerTest : AuthorizedControllerTest() { @Test fun `DELETE project lock works with super auth`() { - val testLocks = ConcurrentHashMap().apply { - put(123L, 456L) + val testLocks = ConcurrentHashMap>().apply { + put(123L, setOf(456L)) } whenever(batchJobProjectLockingManager.getMap()).thenReturn(testLocks) diff --git a/backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt b/backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt index 5228abee9f..7b43a9e90a 100644 --- a/backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt +++ b/backend/app/src/test/kotlin/io/tolgee/batch/BatchJobTestUtil.kt @@ -409,7 +409,7 @@ class BatchJobTestUtil( fun verifyProjectJobLockReleased() { waitFor(pollTime = 200, timeout = 1000) { - batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id) == 0L + batchJobProjectLockingManager.getLockedForProject(testData.projectBuilder.self.id).isEmpty() } } diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt index 10f0ff580b..5ced47e9f2 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt @@ -173,7 +173,7 @@ class BatchJobConcurrentLauncher( } /** - * Only single job can run in project at the same time + * There is a project level lock with configurable n concurrent locks allowed. */ if (!batchJobProjectLockingManager.canLockJobForProject(executionItem.jobId)) { logger.debug( diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt index 7bacf827e4..b751f04846 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt @@ -2,15 +2,18 @@ package io.tolgee.batch import io.tolgee.batch.data.BatchJobDto import io.tolgee.component.UsingRedisProvider +import io.tolgee.configuration.tolgee.BatchProperties import io.tolgee.util.Logging import io.tolgee.util.logger -import org.redisson.api.RMap import org.redisson.api.RedissonClient +import org.springframework.beans.factory.InitializingBean import org.springframework.context.annotation.Lazy import org.springframework.stereotype.Component import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap +private const val REDIS_PROJECT_BATCH_JOB_LOCKS_KEY = "project_batch_job_locks" + /** * Only single job can be executed at the same time for one project. * @@ -19,13 +22,14 @@ import java.util.concurrent.ConcurrentMap @Component class BatchJobProjectLockingManager( private val batchJobService: BatchJobService, + private val batchProperties: BatchProperties, @Lazy private val redissonClient: RedissonClient, private val usingRedisProvider: UsingRedisProvider, -) : Logging { +) : Logging, InitializingBean { companion object { private val localProjectLocks by lazy { - ConcurrentHashMap() + ConcurrentHashMap>() } } @@ -51,18 +55,20 @@ class BatchJobProjectLockingManager( jobId: Long, ) { projectId ?: return - getMap().compute(projectId) { _, lockedJobId -> + getMap().compute(projectId) { _, lockedJobIds -> logger.debug("Unlocking job: $jobId for project $projectId") - if (lockedJobId == jobId) { + val currentJobs = lockedJobIds ?: emptySet() + if (currentJobs.contains(jobId)) { logger.debug("Unlocking job: $jobId for project $projectId") - return@compute 0L + val updatedJobs = currentJobs - jobId + return@compute updatedJobs.ifEmpty { emptySet() } } logger.debug("Job: $jobId for project $projectId is not locked") - return@compute lockedJobId + return@compute currentJobs } } - fun getMap(): ConcurrentMap { + fun getMap(): ConcurrentMap> { if (usingRedisProvider.areWeUsingRedis) { return getRedissonProjectLocks() } @@ -71,65 +77,82 @@ class BatchJobProjectLockingManager( private fun tryLockWithRedisson(batchJobDto: BatchJobDto): Boolean { val projectId = batchJobDto.projectId ?: return true - val computed = - getRedissonProjectLocks().compute(projectId) { _, value -> - computeFnBody(batchJobDto, value) - } - return computed == batchJobDto.id + val computedJobIds = + getRedissonProjectLocks().compute(projectId) { _, lockedJobIds -> + val newLockedJobIds = computeFnBody(batchJobDto, lockedJobIds ?: emptySet()) + logger.debug( + "While trying to lock on redis {} for project {} new lock value is {}", + batchJobDto.id, + batchJobDto.projectId, + newLockedJobIds + ) + newLockedJobIds + } ?: emptySet() + return computedJobIds.contains(batchJobDto.id) } - fun getLockedForProject(projectId: Long): Long? { + fun getLockedForProject(projectId: Long): Set { if (usingRedisProvider.areWeUsingRedis) { - return getRedissonProjectLocks()[projectId] + return getRedissonProjectLocks()[projectId] ?: emptySet() } - return localProjectLocks[projectId] + return localProjectLocks[projectId] ?: emptySet() } - private fun tryLockLocal(toLock: BatchJobDto): Boolean { - val projectId = toLock.projectId ?: return true - val computed = - localProjectLocks.compute(projectId) { _, value -> - val newLocked = computeFnBody(toLock, value) - logger.debug("While trying to lock ${toLock.id} for project ${toLock.projectId} new lock value is $newLocked") - newLocked - } - return computed == toLock.id + private fun tryLockLocal(batchJobDto: BatchJobDto): Boolean { + val projectId = batchJobDto.projectId ?: return true + val computedJobIds = + localProjectLocks.compute(projectId) { _, lockedJobIds -> + val newLockedJobIds = computeFnBody(batchJobDto, lockedJobIds ?: emptySet()) + logger.debug( + "While trying to lock locally {} for project {} new lock value is {}", + batchJobDto.id, + batchJobDto.projectId, + newLockedJobIds + ) + newLockedJobIds + } ?: emptySet() + return computedJobIds.contains(batchJobDto.id) } private fun computeFnBody( toLock: BatchJobDto, - currentValue: Long?, - ): Long { + lockedJobIds: Set, + ): Set { val projectId = toLock.projectId ?: throw IllegalStateException( "Project id is required. " + "Locking for project should not happen for non-project jobs.", ) - // nothing is locked - if (currentValue == 0L) { - logger.debug("Locking job ${toLock.id} for project ${toLock.projectId}, nothing is locked") - return toLock.id - } - // value for the project is not initialized yet - if (currentValue == null) { + // nothing is locked + if (lockedJobIds.isEmpty()) { logger.debug("Getting initial locked state from DB state") // we have to find out from database if there is any running job for the project - val initial = getInitialJobId(projectId) - logger.debug("Initial locked job $initial for project ${toLock.projectId}") - if (initial == null) { - logger.debug("No job found, locking ${toLock.id}") - return toLock.id + val initialJobId = getInitialJobId(projectId) + logger.info("Initial locked job $initialJobId for project ${toLock.projectId}") + if (initialJobId == null) { + logger.debug("No initial job found, locking only ${toLock.id}") + return setOf(toLock.id) } + val newLockedJobIds = mutableSetOf(initialJobId) + if (newLockedJobIds.size < batchProperties.projectConcurrency) { + logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds") + newLockedJobIds.add(toLock.id) + } else { + logger.debug("Cannot lock job ${toLock.id} for project $projectId, limit reached. Active jobs: $newLockedJobIds") + } + return newLockedJobIds + } - logger.debug("Job found, locking $initial") - return initial + // standard case - there are some jobs locked + if (lockedJobIds.size < batchProperties.projectConcurrency) { + logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $lockedJobIds") + return lockedJobIds + toLock.id } - logger.debug("Job $currentValue is locked for project ${toLock.projectId}") // if we cannot lock, we are returning current value - return currentValue + return lockedJobIds } private fun getInitialJobId(projectId: Long): Long? { @@ -162,11 +185,48 @@ class BatchJobProjectLockingManager( return null } - private fun getRedissonProjectLocks(): RMap { - return redissonClient.getMap("project_batch_job_locks") + private fun getRedissonProjectLocks(): ConcurrentMap> { + return redissonClient.getMap(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) + } + + override fun afterPropertiesSet() { + // This runs first to check if redis has a map of the old format. + // If so, we migrate it to the new format. + if (!usingRedisProvider.areWeUsingRedis) { + logger.debug("Not using Redis, skipping migration check") + return + } + + val redisProjectBatchJobLocks = redissonClient.getMap(REDIS_PROJECT_BATCH_JOB_LOCKS_KEY) + val isOldFormat = redisProjectBatchJobLocks.values.any { it is Long || it == null } + if (!isOldFormat) { + logger.debug("Redis project locks are in new format, no migration needed") + return + } + + logger.info("Starting migration of project locks from old format (v1) to new format (v2)") + // First, copy all data from Redis to local memory + val localCopy = mutableMapOf>() + redisProjectBatchJobLocks.forEach { (projectId, jobId) -> + val jobSet = when (jobId) { + null, 0L -> emptySet() + else -> setOf(jobId as Long) + } + localCopy[projectId] = jobSet + } + logger.info("Copied ${localCopy.size} project locks from old format to local memory") + + // Write all data back in new format (this will overwrite the old format) + val newMap = getRedissonProjectLocks() + localCopy.forEach { (projectId, jobSet) -> + newMap[projectId] = jobSet + } + + logger.info("Successfully migrated ${newMap.size} project locks from local memory to new format") } fun getLockedJobIds(): Set { - return getMap().values.filterNotNull().toSet() + return getMap().values.flatten().toSet() } + } diff --git a/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt b/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt index c698fa0f90..914c0e00e4 100644 --- a/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt +++ b/backend/data/src/main/kotlin/io/tolgee/configuration/tolgee/BatchProperties.kt @@ -8,4 +8,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties class BatchProperties { @DocProperty(description = "How many parallel jobs can be run at once on single Tolgee instance") var concurrency: Int = 1 + @DocProperty(description = "How many parallel jobs can be run at once per project across all Tolgee instances") + var projectConcurrency: Int = 1 } From dbc0d62cac8f13421df93df433949aa8120aa9f4 Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Wed, 1 Oct 2025 10:55:55 +0200 Subject: [PATCH 09/10] feat: log at info level for taken project locks --- .../kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt index b751f04846..fdfa9607bf 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/BatchJobProjectLockingManager.kt @@ -132,12 +132,12 @@ class BatchJobProjectLockingManager( val initialJobId = getInitialJobId(projectId) logger.info("Initial locked job $initialJobId for project ${toLock.projectId}") if (initialJobId == null) { - logger.debug("No initial job found, locking only ${toLock.id}") + logger.info("No initial job found, locking only ${toLock.id}") return setOf(toLock.id) } val newLockedJobIds = mutableSetOf(initialJobId) if (newLockedJobIds.size < batchProperties.projectConcurrency) { - logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds") + logger.info("Locking job ${toLock.id} for project $projectId. Active jobs before: $newLockedJobIds") newLockedJobIds.add(toLock.id) } else { logger.debug("Cannot lock job ${toLock.id} for project $projectId, limit reached. Active jobs: $newLockedJobIds") @@ -147,7 +147,7 @@ class BatchJobProjectLockingManager( // standard case - there are some jobs locked if (lockedJobIds.size < batchProperties.projectConcurrency) { - logger.debug("Locking job ${toLock.id} for project $projectId. Active jobs before: $lockedJobIds") + logger.info("Locking job ${toLock.id} for project $projectId. Active jobs before: $lockedJobIds") return lockedJobIds + toLock.id } From da44146dc72f8d1a98faaf5c0c7e7dcd76080c8f Mon Sep 17 00:00:00 2001 From: Arnaud Jeansen Date: Wed, 1 Oct 2025 11:54:16 +0200 Subject: [PATCH 10/10] Disable very verbose warn log (when a lot of jobs exist) --- .../kotlin/io/tolgee/batch/cleaning/ScheduledJobCleaner.kt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/data/src/main/kotlin/io/tolgee/batch/cleaning/ScheduledJobCleaner.kt b/backend/data/src/main/kotlin/io/tolgee/batch/cleaning/ScheduledJobCleaner.kt index 62728c15a5..b72502c33a 100644 --- a/backend/data/src/main/kotlin/io/tolgee/batch/cleaning/ScheduledJobCleaner.kt +++ b/backend/data/src/main/kotlin/io/tolgee/batch/cleaning/ScheduledJobCleaner.kt @@ -45,8 +45,10 @@ class ScheduledJobCleaner( } private fun handleStuckJobs() { - batchJobService.getStuckJobIds(batchJobStateProvider.getCachedJobIds()).forEach { - logger.warn("Removing stuck job state it using scheduled task") + val stuckJobIds = batchJobService.getStuckJobIds(batchJobStateProvider.getCachedJobIds()) + logger.warn("Removing stuck job state using scheduled task for ${stuckJobIds.size} jobs") + stuckJobIds.forEach { + logger.trace("Removing stuck job state using scheduled task for job $it") batchJobStateProvider.removeJobState(it) } }