From ad8ee6fe6bfbf43f5a5fdaa1c5a20f5afdeee756 Mon Sep 17 00:00:00 2001 From: "Alinson S. Xavier" Date: Fri, 7 Nov 2025 11:16:13 -0600 Subject: [PATCH] web: backend: Make JobProcessor more abstract --- web/backend/src/jobs.jl | 30 +++++++------------ web/backend/src/server.jl | 18 +++++++++++- web/backend/test/src/jobs_test.jl | 48 ++++++++++++++----------------- 3 files changed, 49 insertions(+), 47 deletions(-) diff --git a/web/backend/src/jobs.jl b/web/backend/src/jobs.jl index 0149534..ba491a3 100644 --- a/web/backend/src/jobs.jl +++ b/web/backend/src/jobs.jl @@ -2,8 +2,6 @@ # Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved. # Released under the modified BSD license. See COPYING.md for more details. -using UnitCommitment - import Base: put! Base.@kwdef mutable struct JobProcessor @@ -11,7 +9,7 @@ Base.@kwdef mutable struct JobProcessor processing::Channel{String} = Channel{String}(Inf) shutdown::Channel{Bool} = Channel{Bool}(1) worker_task::Union{Task,Nothing} = nothing - optimizer = nothing + work_fn::Function end function Base.put!(processor::JobProcessor, job_id::String) @@ -38,28 +36,21 @@ function run!(processor::JobProcessor) # Move job from pending to processing queue job_id = take!(processor.pending) - @info "Processing job: $job_id" - job_dir = joinpath(basedir, "jobs", job_id) - log_path = joinpath(job_dir, "output.log") put!(processor.processing, job_id) - # Run optimization + # Prepare directories + job_dir = joinpath(basedir, "jobs", job_id) + log_path = joinpath(job_dir, "output.log") + mkpath(job_dir) + + # Run work function try + @info "Processing job: $job_id" open(log_path, "w") do io redirect_stdout(io) do redirect_stderr(io) do - json_path = joinpath(job_dir, "input.json.gz") - instance = UnitCommitment.read(json_path) - model = UnitCommitment.build_model(; - instance, - optimizer = processor.optimizer, - ) - UnitCommitment.optimize!(model) - solution = UnitCommitment.solution(model) - return UnitCommitment.write( - joinpath(job_dir, "output.json"), - solution, - ) + processor.work_fn(job_id) + @info "Job $job_id done" end end end @@ -67,6 +58,7 @@ function run!(processor::JobProcessor) # Remove job from processing queue take!(processor.processing) catch e + @error "Failed job: $job_id" e open(log_path, "a") do io println(io, "\nError: ", e) println(io, "\nStacktrace:") diff --git a/web/backend/src/server.jl b/web/backend/src/server.jl index 2424c0e..c80836e 100644 --- a/web/backend/src/server.jl +++ b/web/backend/src/server.jl @@ -50,8 +50,24 @@ end function start_server(port::Int = 8080; optimizer) Random.seed!() + function work_fn(job_id) + job_dir = joinpath(basedir, "jobs", job_id) + json_path = joinpath(job_dir, "input.json.gz") + instance = UnitCommitment.read(json_path) + model = UnitCommitment.build_model(; + instance, + optimizer = optimizer, + ) + UnitCommitment.optimize!(model) + solution = UnitCommitment.solution(model) + return UnitCommitment.write( + joinpath(job_dir, "output.json"), + solution, + ) + end + # Create and start job processor - processor = JobProcessor(optimizer = optimizer) + processor = JobProcessor(; work_fn) start(processor) router = HTTP.Router() diff --git a/web/backend/test/src/jobs_test.jl b/web/backend/test/src/jobs_test.jl index e6a135f..6531ab6 100644 --- a/web/backend/test/src/jobs_test.jl +++ b/web/backend/test/src/jobs_test.jl @@ -4,36 +4,30 @@ using Backend using Test -using HiGHS function jobs_test_usage() @testset "JobProcessor" begin - # Setup job directory - job_id = "qwe123" - job_dir = joinpath(Backend.basedir, "jobs", job_id) - mkpath(job_dir) - cp(fixture("case14.json.gz"), joinpath(job_dir, "input.json.gz")) - - try - # Create processor with HiGHS optimizer - processor = JobProcessor(optimizer = HiGHS.Optimizer) - - # Start the worker - start(processor) - - # Push job to queue - put!(processor, job_id) - - # Stop worker (wait for jobs to finish) - sleep(0.1) - stop(processor) - - # Check that solution file exists - output_path = joinpath(job_dir, "output.json") - @test isfile(output_path) - finally - # Cleanup - rm(job_dir, recursive = true, force = true) + # Define dummy work function + received_job_id = [] + function work_fn(job_id) + @show received_job_id + push!(received_job_id, job_id) end + + # Create processor with work function + processor = JobProcessor(; work_fn) + + # Start the worker + start(processor) + + # Push job to queue + put!(processor, "test") + + # Wait for job to complete + sleep(0.1) + stop(processor) + + # Check that the work function was called with correct job_id + @test received_job_id[1] == "test" end end