Enhanced bulk processing with executor updates and timeout handling:
- **Backend**: Added `bulkProcessingExecutor` with advanced configuration (thread naming, queue handling, timeout behavior). Updated `BulkOperationController` with monitoring endpoint to expose executor stats. Enhanced `BulkOperationExecutionService` with timeout handling and error logging for bulk operation processes. - **Frontend**: Enabled `allowFreeInput` in `AutoSuggestSearchBar.vue` for enhanced customization. Adjusted date handling across multiple components for improved consistency and formatting. - **Database**: Introduced `timeout` logic in `BulkOperationRepository` to handle stale processing states.
This commit is contained in:
parent
d9229b3d73
commit
1b1356e590
11 changed files with 136 additions and 42 deletions
|
|
@ -8,6 +8,7 @@
|
|||
@input="onInput"
|
||||
@keydown="onKeyDown"
|
||||
@focus="onFocus"
|
||||
@blur="onBlur"
|
||||
type="text"
|
||||
class="search-input"
|
||||
:placeholder="placeholder"
|
||||
|
|
@ -62,7 +63,7 @@
|
|||
|
||||
|
||||
<div
|
||||
v-if="showSuggestions && searchQuery && suggestions.length === 0 && !isLoading"
|
||||
v-if="showSuggestions && searchQuery && suggestions.length === 0 && !isLoading && !allowFreeInput"
|
||||
class="no-results"
|
||||
>
|
||||
{{ noResultsText.replace('{query}', searchQuery) }}
|
||||
|
|
@ -135,6 +136,10 @@ export default {
|
|||
activateWatcher: {
|
||||
type: Boolean,
|
||||
default: false,
|
||||
},
|
||||
allowFreeInput: {
|
||||
type: Boolean,
|
||||
default: false
|
||||
}
|
||||
},
|
||||
|
||||
|
|
@ -192,8 +197,23 @@ export default {
|
|||
}
|
||||
},
|
||||
|
||||
onBlur() {
|
||||
// Verzögerung, damit Click-Event auf Suggestion noch registriert wird
|
||||
setTimeout(() => {
|
||||
if (this.allowFreeInput && this.searchQuery.trim()) {
|
||||
// Emit search event mit freiem Text
|
||||
this.$emit('search', this.searchQuery)
|
||||
}
|
||||
}, 200)
|
||||
},
|
||||
|
||||
onKeyDown(event) {
|
||||
if (!this.showSuggestions || this.suggestions.length === 0) {
|
||||
// Wenn allowFreeInput aktiviert ist, erlaube Enter auch ohne Vorschläge
|
||||
if (event.key === 'Enter' && this.allowFreeInput) {
|
||||
event.preventDefault()
|
||||
this.handleEnterWithoutSelection()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -236,18 +256,26 @@ export default {
|
|||
this.searchQuery = '';
|
||||
} else {
|
||||
this.searchQuery = this.getTitleFor(suggestion)
|
||||
|
||||
}
|
||||
this.hideSuggestions()
|
||||
this.$emit('selected', suggestion)
|
||||
this.$emit('search', this.searchQuery)
|
||||
this.$refs.searchInput.blur()
|
||||
|
||||
},
|
||||
|
||||
handleEnterWithoutSelection() {
|
||||
const query = this.searchQuery.trim()
|
||||
|
||||
// Wenn allowFreeInput deaktiviert ist und keine Vorschläge vorhanden sind, nichts tun
|
||||
if (!this.allowFreeInput && this.suggestions.length === 0) {
|
||||
return
|
||||
}
|
||||
|
||||
this.hideSuggestions()
|
||||
this.$emit('search', this.searchQuery)
|
||||
|
||||
if (query) {
|
||||
this.$emit('search', query)
|
||||
}
|
||||
},
|
||||
|
||||
hideSuggestions() {
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@
|
|||
</div>
|
||||
<div class="bulk-operation-info">
|
||||
<div>{{ operation.processing_type.toLowerCase() }} {{ operation.file_type.toLowerCase() }}</div>
|
||||
<div class="bulk-operation-date">{{ buildDate(operation.timestamp) }}</div>
|
||||
<div class="bulk-operation-date">{{ operation.timestamp }}</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="bulk-operation-status">
|
||||
|
|
|
|||
|
|
@ -132,8 +132,9 @@ export default {
|
|||
methods: {
|
||||
buildDate(date) {
|
||||
if(date === null) return "not set";
|
||||
return date;
|
||||
|
||||
return `${date[0]}-${date[1].toString().padStart(2, '0')}-${date[2].toString().padStart(2, '0')} ${date[3]?.toString().padStart(2, '0') ?? '00'}:${date[4]?.toString().padStart(2, '0') ?? '00'}:${date[5]?.toString().padStart(2, '0') ?? '00'}`
|
||||
// return `${date[0]}-${date[1].toString().padStart(2, '0')}-${date[2].toString().padStart(2, '0')} ${date[3]?.toString().padStart(2, '0') ?? '00'}:${date[4]?.toString().padStart(2, '0') ?? '00'}:${date[5]?.toString().padStart(2, '0') ?? '00'}`
|
||||
},
|
||||
async saveProperty(property) {
|
||||
this.countryStore.setProperty(property);
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@
|
|||
<div class="hs-code-container">
|
||||
<autosuggest-searchbar ref="hsCodeSearchbar" :activate-watcher="true" :fetch-suggestions="fetchHsCode"
|
||||
:initial-value="hsCode"
|
||||
@selected="hsCodeSelected" @blur="hsCodeChanged"
|
||||
@selected="hsCodeSelected" @blur="hsCodeChanged" :allow-free-input="true"
|
||||
placeholder="Find hs code" no-results-text="Not found."></autosuggest-searchbar>
|
||||
|
||||
</div>
|
||||
|
|
|
|||
|
|
@ -81,7 +81,8 @@ export default {
|
|||
buildDate(date) {
|
||||
if(date === null) return "not set";
|
||||
|
||||
return `${date[0]}-${date[1].toString().padStart(2, '0')}-${date[2].toString().padStart(2, '0')} ${date[3]?.toString().padStart(2, '0') ?? '00'}:${date[4]?.toString().padStart(2, '0') ?? '00'}:${date[5]?.toString().padStart(2, '0') ?? '00'}`
|
||||
return date;
|
||||
// return `${date[0]}-${date[1].toString().padStart(2, '0')}-${date[2].toString().padStart(2, '0')} ${date[3]?.toString().padStart(2, '0') ?? '00'}:${date[4]?.toString().padStart(2, '0') ?? '00'}:${date[5]?.toString().padStart(2, '0') ?? '00'}`
|
||||
},
|
||||
showDetails(error) {
|
||||
console.log("click")
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ export default {
|
|||
this.showModal = true;
|
||||
},
|
||||
buildDate(date) {
|
||||
return `${date[0]}-${date[1].toString().padStart(2, '0')}-${date[2].toString().padStart(2, '0')}`
|
||||
return new Date(date).toLocaleDateString('en-EN')
|
||||
},
|
||||
async closeModal(data) {
|
||||
if (data.action === 'accept') {
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import org.springframework.scheduling.annotation.EnableAsync;
|
|||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
@Configuration
|
||||
@EnableAsync
|
||||
|
|
@ -29,18 +30,24 @@ public class AsyncConfig {
|
|||
executor.setCorePoolSize(4);
|
||||
executor.setMaxPoolSize(8);
|
||||
executor.setQueueCapacity(100);
|
||||
executor.setThreadNamePrefix("calc-");
|
||||
executor.setThreadNamePrefix("lookup-");
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Bean(name = "bulkProcessingExecutor")
|
||||
public Executor bulkProcessingExecutor() {
|
||||
public ThreadPoolTaskExecutor bulkProcessingExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(1);
|
||||
executor.setMaxPoolSize(1);
|
||||
executor.setQueueCapacity(100);
|
||||
executor.setThreadNamePrefix("bulk-processing-");
|
||||
executor.setThreadNamePrefix("bulk-");
|
||||
|
||||
executor.setWaitForTasksToCompleteOnShutdown(true);
|
||||
executor.setAwaitTerminationSeconds(600);
|
||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
executor.setAllowCoreThreadTimeOut(false);
|
||||
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,16 +8,21 @@ import de.avatic.lcc.service.bulk.BulkOperationService;
|
|||
import de.avatic.lcc.service.bulk.TemplateExportService;
|
||||
import de.avatic.lcc.util.exception.base.BadRequestException;
|
||||
import jakarta.annotation.security.RolesAllowed;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.core.io.ByteArrayResource;
|
||||
import org.springframework.core.io.InputStreamResource;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.security.access.prepost.PreAuthorize;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* REST controller for handling bulk operations, including file uploads, template generation,
|
||||
|
|
@ -31,9 +36,26 @@ public class BulkOperationController {
|
|||
private final BulkOperationService bulkOperationService;
|
||||
private final TemplateExportService templateExportService;
|
||||
|
||||
public BulkOperationController(BulkOperationService bulkOperationService, TemplateExportService templateExportService) {
|
||||
|
||||
private final ThreadPoolTaskExecutor executor;
|
||||
|
||||
public BulkOperationController(BulkOperationService bulkOperationService, TemplateExportService templateExportService, @Qualifier("bulkProcessingExecutor") ThreadPoolTaskExecutor executor) {
|
||||
this.bulkOperationService = bulkOperationService;
|
||||
this.templateExportService = templateExportService;
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
@GetMapping("/monitor")
|
||||
@PreAuthorize("hasAnyRole('SUPER', 'FREIGHT', 'PACKAGING', 'MATERIAL')")
|
||||
public Map<String, Object> getBulkThreadInfo() {
|
||||
Map<String, Object> info = new HashMap<>();
|
||||
info.put("activeCount", executor.getActiveCount());
|
||||
info.put("poolSize", executor.getPoolSize());
|
||||
info.put("corePoolSize", executor.getCorePoolSize());
|
||||
info.put("maxPoolSize", executor.getMaxPoolSize());
|
||||
info.put("queueSize", executor.getThreadPoolExecutor().getQueue().size());
|
||||
info.put("completedTaskCount", executor.getThreadPoolExecutor().getCompletedTaskCount());
|
||||
return info;
|
||||
}
|
||||
|
||||
@GetMapping({"/status/", "/status"})
|
||||
|
|
|
|||
|
|
@ -118,6 +118,9 @@ public class BulkOperationRepository {
|
|||
|
||||
@Transactional
|
||||
public List<BulkOperation> listByUserId(Integer userId) {
|
||||
|
||||
timeout(userId);
|
||||
|
||||
String sql = """
|
||||
SELECT id, user_id, bulk_file_type, bulk_processing_type, state, created_at, validity_period_id
|
||||
FROM bulk_operation
|
||||
|
|
@ -129,6 +132,15 @@ public class BulkOperationRepository {
|
|||
return jdbcTemplate.query(sql, new BulkOperationRowMapper(true), userId);
|
||||
}
|
||||
|
||||
private void timeout(Integer userId) {
|
||||
|
||||
String sql = """
|
||||
UPDATE bulk_operation SET state = 'EXCEPTION' WHERE user_id = ? AND state = 'PROCESSING' AND created_at < NOW() - INTERVAL 30 MINUTE
|
||||
""";
|
||||
|
||||
jdbcTemplate.update(sql, userId);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public Optional<BulkOperation> getOperationById(Integer id) {
|
||||
String sql = """
|
||||
|
|
@ -162,30 +174,30 @@ public class BulkOperationRepository {
|
|||
|
||||
private record BulkOperationRowMapper(boolean skipFile) implements RowMapper<BulkOperation> {
|
||||
|
||||
BulkOperationRowMapper() {
|
||||
this(false);
|
||||
}
|
||||
BulkOperationRowMapper() {
|
||||
this(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BulkOperation mapRow(ResultSet rs, int rowNum) throws SQLException {
|
||||
BulkOperation operation = new BulkOperation();
|
||||
operation.setId(rs.getInt("id"));
|
||||
operation.setUserId(rs.getInt("user_id"));
|
||||
operation.setProcessingType(BulkProcessingType.valueOf(rs.getString("bulk_processing_type")));
|
||||
operation.setFileType(BulkFileType.valueOf(rs.getString("bulk_file_type")));
|
||||
operation.setProcessState(BulkOperationState.valueOf(rs.getString("state")));
|
||||
public BulkOperation mapRow(ResultSet rs, int rowNum) throws SQLException {
|
||||
BulkOperation operation = new BulkOperation();
|
||||
operation.setId(rs.getInt("id"));
|
||||
operation.setUserId(rs.getInt("user_id"));
|
||||
operation.setProcessingType(BulkProcessingType.valueOf(rs.getString("bulk_processing_type")));
|
||||
operation.setFileType(BulkFileType.valueOf(rs.getString("bulk_file_type")));
|
||||
operation.setProcessState(BulkOperationState.valueOf(rs.getString("state")));
|
||||
|
||||
|
||||
operation.setValidityPeriodId(rs.getInt("validity_period_id"));
|
||||
if (rs.wasNull())
|
||||
operation.setValidityPeriodId(null);
|
||||
operation.setValidityPeriodId(rs.getInt("validity_period_id"));
|
||||
if (rs.wasNull())
|
||||
operation.setValidityPeriodId(null);
|
||||
|
||||
if (!skipFile)
|
||||
operation.setFile(rs.getBytes("file"));
|
||||
operation.setCreatedAt(rs.getTimestamp("created_at").toLocalDateTime());
|
||||
return operation;
|
||||
}
|
||||
if (!skipFile)
|
||||
operation.setFile(rs.getBytes("file"));
|
||||
operation.setCreatedAt(rs.getTimestamp("created_at").toLocalDateTime());
|
||||
return operation;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,10 +7,14 @@ import de.avatic.lcc.model.db.error.SysErrorType;
|
|||
import de.avatic.lcc.repositories.bulk.BulkOperationRepository;
|
||||
import de.avatic.lcc.repositories.error.SysErrorRepository;
|
||||
import de.avatic.lcc.service.transformer.error.SysErrorTransformer;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Service
|
||||
public class BulkOperationExecutionService {
|
||||
|
|
@ -20,17 +24,43 @@ public class BulkOperationExecutionService {
|
|||
private final BulkImportService bulkImportService;
|
||||
private final SysErrorRepository sysErrorRepository;
|
||||
private final SysErrorTransformer sysErrorTransformer;
|
||||
private final Executor bulkProcessingExecutor;
|
||||
|
||||
public BulkOperationExecutionService(BulkOperationRepository bulkOperationRepository, BulkExportService bulkExportService, BulkImportService bulkImportService, SysErrorRepository sysErrorRepository, SysErrorTransformer sysErrorTransformer) {
|
||||
public BulkOperationExecutionService(BulkOperationRepository bulkOperationRepository, BulkExportService bulkExportService, BulkImportService bulkImportService, SysErrorRepository sysErrorRepository, SysErrorTransformer sysErrorTransformer, @Qualifier("bulkProcessingExecutor") Executor bulkProcessingExecutor) {
|
||||
this.bulkOperationRepository = bulkOperationRepository;
|
||||
this.bulkExportService = bulkExportService;
|
||||
this.bulkImportService = bulkImportService;
|
||||
this.sysErrorRepository = sysErrorRepository;
|
||||
this.sysErrorTransformer = sysErrorTransformer;
|
||||
this.bulkProcessingExecutor = bulkProcessingExecutor;
|
||||
}
|
||||
|
||||
@Async("bulkProcessingExecutor")
|
||||
public void launchExecution(Integer id) {
|
||||
public CompletableFuture<Void> launchExecution(Integer id) {
|
||||
return CompletableFuture.runAsync(() -> {
|
||||
execution(id);
|
||||
}, bulkProcessingExecutor)
|
||||
.orTimeout(30, TimeUnit.MINUTES)
|
||||
.exceptionally(e -> {
|
||||
bulkOperationRepository.updateState(id, BulkOperationState.EXCEPTION);
|
||||
|
||||
|
||||
var error = new SysError();
|
||||
error.setType(SysErrorType.BULK);
|
||||
error.setCode(e.getClass().getSimpleName());
|
||||
error.setTitle("Bulk operation execution id" + id + " failed with timeout");
|
||||
error.setMessage(e.getMessage() == null ? "" : e.getMessage());
|
||||
error.setUserId(null);
|
||||
error.setBulkOperationId(id);
|
||||
error.setTrace(Arrays.stream(e.getStackTrace()).map(sysErrorTransformer::toSysErrorTraceItem).toList());
|
||||
|
||||
sysErrorRepository.insert(error);
|
||||
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public void execution(Integer id) {
|
||||
|
||||
var operation = bulkOperationRepository.getOperationById(id);
|
||||
|
||||
|
|
@ -49,14 +79,14 @@ public class BulkOperationExecutionService {
|
|||
op.setProcessState(BulkOperationState.COMPLETED);
|
||||
|
||||
}
|
||||
} catch (Exception e) {
|
||||
} catch (Throwable e) {
|
||||
op.setProcessState(BulkOperationState.EXCEPTION);
|
||||
|
||||
var error = new SysError();
|
||||
error.setType(SysErrorType.BULK);
|
||||
error.setCode(e.getClass().getSimpleName());
|
||||
error.setTitle("Bulk operation execution " + op.getProcessingType() + " of " + op.getFileType() + " failed");
|
||||
error.setMessage(e.getMessage());
|
||||
error.setMessage(e.getMessage() == null ? "" : e.getMessage());
|
||||
error.setUserId(op.getUserId());
|
||||
error.setBulkOperationId(op.getId());
|
||||
error.setTrace(Arrays.stream(e.getStackTrace()).map(sysErrorTransformer::toSysErrorTraceItem).toList());
|
||||
|
|
@ -66,13 +96,7 @@ public class BulkOperationExecutionService {
|
|||
|
||||
bulkOperationRepository.update(op);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,7 +59,6 @@ public class ReportingService {
|
|||
var periodId = tuple.get().periodId();
|
||||
var setId = tuple.get().propertySetId();
|
||||
|
||||
|
||||
var jobs = new ArrayList<CalculationJob>();
|
||||
|
||||
if(!nodeIds.isEmpty())
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue