Step 2.1: Kritische Repositories (3 Tasks)
This commit is contained in:
parent
10a8cfa72b
commit
29675f9ff4
3 changed files with 127 additions and 70 deletions
|
|
@ -1,5 +1,6 @@
|
|||
package de.avatic.lcc.repositories.bulk;
|
||||
|
||||
import de.avatic.lcc.database.dialect.SqlDialectProvider;
|
||||
import de.avatic.lcc.dto.bulk.BulkFileType;
|
||||
import de.avatic.lcc.dto.bulk.BulkOperationState;
|
||||
import de.avatic.lcc.dto.bulk.BulkProcessingType;
|
||||
|
|
@ -24,9 +25,11 @@ public class BulkOperationRepository {
|
|||
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private final SqlDialectProvider dialectProvider;
|
||||
|
||||
public BulkOperationRepository(JdbcTemplate jdbcTemplate) {
|
||||
public BulkOperationRepository(JdbcTemplate jdbcTemplate, SqlDialectProvider dialectProvider) {
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
this.dialectProvider = dialectProvider;
|
||||
}
|
||||
|
||||
@Transactional
|
||||
|
|
@ -68,41 +71,42 @@ public class BulkOperationRepository {
|
|||
public void removeOld(Integer userId) {
|
||||
// First, update sys_error records to set bulk_operation_id to NULL
|
||||
// for bulk operations that will be deleted (all but the 10 newest for the current user)
|
||||
String updateErrorsSql = """
|
||||
UPDATE sys_error
|
||||
SET bulk_operation_id = NULL
|
||||
WHERE bulk_operation_id IN (
|
||||
SELECT id FROM (
|
||||
SELECT id
|
||||
FROM bulk_operation
|
||||
WHERE user_id = ?
|
||||
AND state NOT IN ('SCHEDULED', 'PROCESSING')
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 18446744073709551615 OFFSET 10
|
||||
) AS old_operations
|
||||
)
|
||||
""";
|
||||
|
||||
jdbcTemplate.update(updateErrorsSql, userId);
|
||||
// Build subquery to get the 10 newest operations
|
||||
String newestSubquery = "SELECT id FROM bulk_operation WHERE user_id = ? AND state NOT IN ('SCHEDULED', 'PROCESSING') ORDER BY created_at DESC ";
|
||||
String newestWithLimit = newestSubquery + dialectProvider.buildPaginationClause(10, 0);
|
||||
Object[] paginationParams = dialectProvider.getPaginationParameters(10, 0);
|
||||
|
||||
String updateErrorsSql = String.format("""
|
||||
UPDATE sys_error
|
||||
SET bulk_operation_id = NULL
|
||||
WHERE bulk_operation_id IN (
|
||||
SELECT id FROM bulk_operation
|
||||
WHERE user_id = ?
|
||||
AND state NOT IN ('SCHEDULED', 'PROCESSING')
|
||||
AND id NOT IN (
|
||||
%s
|
||||
)
|
||||
)
|
||||
""", newestWithLimit);
|
||||
|
||||
// Combine params: userId for outer query, userId + pagination params for subquery
|
||||
Object[] updateParams = new Object[]{userId, userId, paginationParams[0], paginationParams[1]};
|
||||
jdbcTemplate.update(updateErrorsSql, updateParams);
|
||||
|
||||
// Then delete the old bulk_operation entries (keeping only the 10 newest for the current user)
|
||||
String deleteBulkSql = """
|
||||
DELETE FROM bulk_operation
|
||||
WHERE user_id = ?
|
||||
String deleteBulkSql = String.format("""
|
||||
DELETE FROM bulk_operation
|
||||
WHERE user_id = ?
|
||||
AND state NOT IN ('SCHEDULED', 'PROCESSING')
|
||||
AND id NOT IN (
|
||||
SELECT id FROM (
|
||||
SELECT id
|
||||
FROM bulk_operation
|
||||
WHERE user_id = ?
|
||||
AND state NOT IN ('SCHEDULED', 'PROCESSING')
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 10
|
||||
) AS newest_operations
|
||||
%s
|
||||
)
|
||||
""";
|
||||
""", newestWithLimit);
|
||||
|
||||
jdbcTemplate.update(deleteBulkSql, userId, userId);
|
||||
// Combine params: userId for WHERE clause, userId + pagination params for subquery
|
||||
Object[] deleteParams = new Object[]{userId, userId, paginationParams[0], paginationParams[1]};
|
||||
jdbcTemplate.update(deleteBulkSql, deleteParams);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
|
|
@ -121,22 +125,33 @@ public class BulkOperationRepository {
|
|||
|
||||
cleanupTimeouts(userId);
|
||||
|
||||
String sql = """
|
||||
String baseQuery = """
|
||||
SELECT id, user_id, bulk_file_type, bulk_processing_type, state, created_at, validity_period_id
|
||||
FROM bulk_operation
|
||||
WHERE user_id = ?
|
||||
|
||||
ORDER BY created_at DESC LIMIT 10
|
||||
ORDER BY created_at DESC
|
||||
""";
|
||||
|
||||
return jdbcTemplate.query(sql, new BulkOperationRowMapper(true), userId);
|
||||
String sql = baseQuery + dialectProvider.buildPaginationClause(10, 0);
|
||||
Object[] paginationParams = dialectProvider.getPaginationParameters(10, 0);
|
||||
|
||||
// Combine userId with pagination params
|
||||
Object[] allParams = new Object[]{userId, paginationParams[0], paginationParams[1]};
|
||||
|
||||
return jdbcTemplate.query(sql, new BulkOperationRowMapper(true), allParams);
|
||||
}
|
||||
|
||||
private void cleanupTimeouts(Integer userId) {
|
||||
|
||||
String sql = """
|
||||
UPDATE bulk_operation SET state = 'EXCEPTION' WHERE user_id = ? AND (state = 'PROCESSING' OR state = 'SCHEDULED') AND created_at < NOW() - INTERVAL 60 MINUTE
|
||||
""";
|
||||
// Build date subtraction expression (60 minutes ago)
|
||||
String dateCondition = dialectProvider.buildDateSubtraction(null, "60", SqlDialectProvider.DateUnit.MINUTE);
|
||||
|
||||
String sql = String.format("""
|
||||
UPDATE bulk_operation SET state = 'EXCEPTION'
|
||||
WHERE user_id = ?
|
||||
AND (state = 'PROCESSING' OR state = 'SCHEDULED')
|
||||
AND created_at < %s
|
||||
""", dateCondition);
|
||||
|
||||
jdbcTemplate.update(sql, userId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package de.avatic.lcc.repositories.calculation;
|
||||
|
||||
import de.avatic.lcc.database.dialect.SqlDialectProvider;
|
||||
import de.avatic.lcc.model.db.calculations.CalculationJob;
|
||||
import de.avatic.lcc.model.db.calculations.CalculationJobPriority;
|
||||
import de.avatic.lcc.model.db.calculations.CalculationJobState;
|
||||
|
|
@ -18,9 +19,11 @@ import java.util.Optional;
|
|||
@Repository
|
||||
public class CalculationJobRepository {
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private final SqlDialectProvider dialectProvider;
|
||||
|
||||
public CalculationJobRepository(JdbcTemplate jdbcTemplate) {
|
||||
public CalculationJobRepository(JdbcTemplate jdbcTemplate, SqlDialectProvider dialectProvider) {
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
this.dialectProvider = dialectProvider;
|
||||
}
|
||||
|
||||
@Transactional
|
||||
|
|
@ -63,23 +66,31 @@ public class CalculationJobRepository {
|
|||
*/
|
||||
@Transactional
|
||||
public Optional<CalculationJob> fetchAndLockNextJob() {
|
||||
String sql = """
|
||||
// Build base query with ORDER BY (required for OFFSET/FETCH in MSSQL)
|
||||
String baseQuery = """
|
||||
SELECT * FROM calculation_job
|
||||
WHERE (job_state = 'CREATED')
|
||||
OR (job_state = 'EXCEPTION' AND retries < 3)
|
||||
ORDER BY
|
||||
CASE
|
||||
CASE
|
||||
WHEN job_state = 'CREATED' AND priority = 'HIGH' THEN 1
|
||||
WHEN job_state = 'CREATED' AND priority = 'MEDIUM' THEN 2
|
||||
WHEN job_state = 'CREATED' AND priority = 'LOW' THEN 3
|
||||
WHEN job_state = 'EXCEPTION' THEN 4
|
||||
END,
|
||||
calculation_date
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
""";
|
||||
""";
|
||||
|
||||
var jobs = jdbcTemplate.query(sql, new CalculationJobMapper());
|
||||
// Add pagination (LIMIT 1 OFFSET 0)
|
||||
String paginatedQuery = baseQuery + " " + dialectProvider.buildPaginationClause(1, 0);
|
||||
|
||||
// Add pessimistic locking with skip locked
|
||||
String sql = dialectProvider.buildSelectForUpdateSkipLocked(paginatedQuery);
|
||||
|
||||
// Get pagination parameters in correct order for the database
|
||||
Object[] params = dialectProvider.getPaginationParameters(1, 0);
|
||||
|
||||
var jobs = jdbcTemplate.query(sql, new CalculationJobMapper(), params);
|
||||
|
||||
if (jobs.isEmpty()) {
|
||||
return Optional.empty();
|
||||
|
|
@ -151,9 +162,14 @@ public class CalculationJobRepository {
|
|||
public Optional<CalculationJob> getCalculationJobWithJobStateValid(Integer periodId, Integer setId, Integer nodeId, Integer materialId) {
|
||||
|
||||
/* there should only be one job per period id, node id and material id combination */
|
||||
String query = "SELECT * FROM calculation_job AS cj INNER JOIN premise AS p ON cj.premise_id = p.id WHERE job_state = 'VALID' AND validity_period_id = ? AND property_set_id = ? AND p.supplier_node_id = ? AND material_id = ? ORDER BY cj.calculation_date DESC LIMIT 1";
|
||||
String baseQuery = "SELECT * FROM calculation_job AS cj INNER JOIN premise AS p ON cj.premise_id = p.id WHERE job_state = 'VALID' AND validity_period_id = ? AND property_set_id = ? AND p.supplier_node_id = ? AND material_id = ? ORDER BY cj.calculation_date DESC ";
|
||||
String query = baseQuery + dialectProvider.buildPaginationClause(1, 0);
|
||||
Object[] params = dialectProvider.getPaginationParameters(1, 0);
|
||||
|
||||
var job = jdbcTemplate.query(query, new CalculationJobMapper(), periodId, setId, nodeId, materialId);
|
||||
// Combine business logic params with pagination params
|
||||
Object[] allParams = new Object[]{periodId, setId, nodeId, materialId, params[0], params[1]};
|
||||
|
||||
var job = jdbcTemplate.query(query, new CalculationJobMapper(), allParams);
|
||||
|
||||
if (job.isEmpty())
|
||||
return Optional.empty();
|
||||
|
|
@ -165,9 +181,14 @@ public class CalculationJobRepository {
|
|||
public Optional<CalculationJob> getCalculationJobWithJobStateValidUserNodeId(Integer periodId, Integer setId, Integer userNodeId, Integer materialId) {
|
||||
|
||||
/* there should only be one job per period id, node id and material id combination */
|
||||
String query = "SELECT * FROM calculation_job AS cj INNER JOIN premise AS p ON cj.premise_id = p.id WHERE job_state = 'VALID' AND validity_period_id = ? AND property_set_id = ? AND p.user_supplier_node_id = ? AND material_id = ? ORDER BY cj.calculation_date DESC LIMIT 1";
|
||||
String baseQuery = "SELECT * FROM calculation_job AS cj INNER JOIN premise AS p ON cj.premise_id = p.id WHERE job_state = 'VALID' AND validity_period_id = ? AND property_set_id = ? AND p.user_supplier_node_id = ? AND material_id = ? ORDER BY cj.calculation_date DESC ";
|
||||
String query = baseQuery + dialectProvider.buildPaginationClause(1, 0);
|
||||
Object[] params = dialectProvider.getPaginationParameters(1, 0);
|
||||
|
||||
var job = jdbcTemplate.query(query, new CalculationJobMapper(), periodId, setId, userNodeId, materialId);
|
||||
// Combine business logic params with pagination params
|
||||
Object[] allParams = new Object[]{periodId, setId, userNodeId, materialId, params[0], params[1]};
|
||||
|
||||
var job = jdbcTemplate.query(query, new CalculationJobMapper(), allParams);
|
||||
|
||||
if (job.isEmpty())
|
||||
return Optional.empty();
|
||||
|
|
@ -211,8 +232,14 @@ public class CalculationJobRepository {
|
|||
@Transactional
|
||||
public CalculationJobState getLastStateFor(Integer premiseId) {
|
||||
|
||||
String sql = "SELECT job_state FROM calculation_job WHERE premise_id = ? ORDER BY calculation_date DESC LIMIT 1";
|
||||
var result = jdbcTemplate.query(sql, (rs, rowNum) -> CalculationJobState.valueOf(rs.getString("job_state")), premiseId);
|
||||
String baseQuery = "SELECT job_state FROM calculation_job WHERE premise_id = ? ORDER BY calculation_date DESC ";
|
||||
String sql = baseQuery + dialectProvider.buildPaginationClause(1, 0);
|
||||
Object[] params = dialectProvider.getPaginationParameters(1, 0);
|
||||
|
||||
// Combine business logic params with pagination params
|
||||
Object[] allParams = new Object[]{premiseId, params[0], params[1]};
|
||||
|
||||
var result = jdbcTemplate.query(sql, (rs, rowNum) -> CalculationJobState.valueOf(rs.getString("job_state")), allParams);
|
||||
|
||||
if (result.isEmpty())
|
||||
return null;
|
||||
|
|
@ -227,9 +254,13 @@ public class CalculationJobRepository {
|
|||
|
||||
public Integer getFailedJobByUserId(Integer userId) {
|
||||
|
||||
String sql = "SELECT COUNT(*) FROM calculation_job WHERE user_id = ? AND job_state = 'EXCEPTION' AND calculation_date > DATE_SUB(NOW(), INTERVAL 3 DAY)";
|
||||
|
||||
// Build date subtraction expression using dialect provider
|
||||
String dateCondition = dialectProvider.buildDateSubtraction(null, "3", SqlDialectProvider.DateUnit.DAY);
|
||||
|
||||
String sql = String.format(
|
||||
"SELECT COUNT(*) FROM calculation_job WHERE user_id = ? AND job_state = 'EXCEPTION' AND calculation_date > %s",
|
||||
dateCondition
|
||||
);
|
||||
|
||||
return jdbcTemplate.queryForObject(sql, Integer.class, userId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package de.avatic.lcc.repositories.rates;
|
||||
|
||||
import de.avatic.lcc.database.dialect.SqlDialectProvider;
|
||||
import de.avatic.lcc.dto.generic.TransportType;
|
||||
import de.avatic.lcc.model.db.rates.ContainerRate;
|
||||
import de.avatic.lcc.model.db.rates.ValidityPeriodState;
|
||||
|
|
@ -13,6 +14,7 @@ import org.springframework.transaction.annotation.Transactional;
|
|||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
|
@ -21,9 +23,11 @@ import java.util.Optional;
|
|||
public class ContainerRateRepository {
|
||||
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
private final SqlDialectProvider dialectProvider;
|
||||
|
||||
public ContainerRateRepository(JdbcTemplate jdbcTemplate) {
|
||||
public ContainerRateRepository(JdbcTemplate jdbcTemplate, SqlDialectProvider dialectProvider) {
|
||||
this.jdbcTemplate = jdbcTemplate;
|
||||
this.dialectProvider = dialectProvider;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -74,9 +78,12 @@ public class ContainerRateRepository {
|
|||
}
|
||||
}
|
||||
|
||||
queryBuilder.append(" ORDER BY cr.id LIMIT ? OFFSET ?");
|
||||
params.add(pagination.getLimit());
|
||||
params.add(pagination.getOffset());
|
||||
queryBuilder.append(" ORDER BY cr.id ");
|
||||
queryBuilder.append(dialectProvider.buildPaginationClause(pagination.getLimit(), pagination.getOffset()));
|
||||
|
||||
Object[] paginationParams = dialectProvider.getPaginationParameters(pagination.getLimit(), pagination.getOffset());
|
||||
params.add(paginationParams[0]);
|
||||
params.add(paginationParams[1]);
|
||||
|
||||
Integer totalCount = jdbcTemplate.queryForObject(countQueryBuilder.toString(), Integer.class, countParams.toArray());
|
||||
var results = jdbcTemplate.query(queryBuilder.toString(), new ContainerRateMapper(), params.toArray());
|
||||
|
|
@ -213,17 +220,17 @@ public class ContainerRateRepository {
|
|||
|
||||
@Transactional
|
||||
public void insert(ContainerRate containerRate) {
|
||||
String sql = """
|
||||
INSERT INTO container_rate
|
||||
(from_node_id, to_node_id, container_rate_type, rate_teu, rate_feu, rate_hc, lead_time, validity_period_id)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
container_rate_type = VALUES(container_rate_type),
|
||||
rate_teu = VALUES(rate_teu),
|
||||
rate_feu = VALUES(rate_feu),
|
||||
rate_hc = VALUES(rate_hc),
|
||||
lead_time = VALUES(lead_time)
|
||||
""";
|
||||
// Build UPSERT statement using dialect provider
|
||||
List<String> uniqueColumns = Arrays.asList("from_node_id", "to_node_id", "container_rate_type", "validity_period_id");
|
||||
List<String> insertColumns = Arrays.asList("from_node_id", "to_node_id", "container_rate_type", "rate_teu", "rate_feu", "rate_hc", "lead_time", "validity_period_id");
|
||||
List<String> updateColumns = Arrays.asList("container_rate_type", "rate_teu", "rate_feu", "rate_hc", "lead_time");
|
||||
|
||||
String sql = dialectProvider.buildUpsertStatement(
|
||||
"container_rate",
|
||||
uniqueColumns,
|
||||
insertColumns,
|
||||
updateColumns
|
||||
);
|
||||
|
||||
jdbcTemplate.update(sql,
|
||||
containerRate.getFromNodeId(),
|
||||
|
|
@ -259,7 +266,11 @@ public class ContainerRateRepository {
|
|||
|
||||
@Transactional
|
||||
public void copyCurrentToDraft() {
|
||||
String sql = """
|
||||
// Build LIMIT clause for subquery
|
||||
String limitClause = dialectProvider.buildPaginationClause(1, 0);
|
||||
Object[] paginationParams = dialectProvider.getPaginationParameters(1, 0);
|
||||
|
||||
String sql = String.format("""
|
||||
INSERT INTO container_rate (
|
||||
from_node_id,
|
||||
to_node_id,
|
||||
|
|
@ -278,13 +289,13 @@ public class ContainerRateRepository {
|
|||
cr.rate_feu,
|
||||
cr.rate_hc,
|
||||
cr.lead_time,
|
||||
(SELECT id FROM validity_period WHERE state = 'DRAFT' LIMIT 1) as validity_period_id
|
||||
(SELECT id FROM validity_period WHERE state = 'DRAFT' %s) as validity_period_id
|
||||
FROM container_rate cr
|
||||
INNER JOIN validity_period vp ON cr.validity_period_id = vp.id
|
||||
WHERE vp.state = 'VALID'
|
||||
""";
|
||||
""", limitClause);
|
||||
|
||||
jdbcTemplate.update(sql);
|
||||
jdbcTemplate.update(sql, paginationParams);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue