mirror of
https://github.com/ANL-CEEESA/UnitCommitment.jl.git
synced 2025-12-06 00:08:52 -06:00
web: backend: Make JobProcessor more abstract
This commit is contained in:
@@ -2,8 +2,6 @@
|
||||
# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved.
|
||||
# Released under the modified BSD license. See COPYING.md for more details.
|
||||
|
||||
using UnitCommitment
|
||||
|
||||
import Base: put!
|
||||
|
||||
Base.@kwdef mutable struct JobProcessor
|
||||
@@ -11,7 +9,7 @@ Base.@kwdef mutable struct JobProcessor
|
||||
processing::Channel{String} = Channel{String}(Inf)
|
||||
shutdown::Channel{Bool} = Channel{Bool}(1)
|
||||
worker_task::Union{Task,Nothing} = nothing
|
||||
optimizer = nothing
|
||||
work_fn::Function
|
||||
end
|
||||
|
||||
function Base.put!(processor::JobProcessor, job_id::String)
|
||||
@@ -38,28 +36,21 @@ function run!(processor::JobProcessor)
|
||||
|
||||
# Move job from pending to processing queue
|
||||
job_id = take!(processor.pending)
|
||||
@info "Processing job: $job_id"
|
||||
job_dir = joinpath(basedir, "jobs", job_id)
|
||||
log_path = joinpath(job_dir, "output.log")
|
||||
put!(processor.processing, job_id)
|
||||
|
||||
# Run optimization
|
||||
# Prepare directories
|
||||
job_dir = joinpath(basedir, "jobs", job_id)
|
||||
log_path = joinpath(job_dir, "output.log")
|
||||
mkpath(job_dir)
|
||||
|
||||
# Run work function
|
||||
try
|
||||
@info "Processing job: $job_id"
|
||||
open(log_path, "w") do io
|
||||
redirect_stdout(io) do
|
||||
redirect_stderr(io) do
|
||||
json_path = joinpath(job_dir, "input.json.gz")
|
||||
instance = UnitCommitment.read(json_path)
|
||||
model = UnitCommitment.build_model(;
|
||||
instance,
|
||||
optimizer = processor.optimizer,
|
||||
)
|
||||
UnitCommitment.optimize!(model)
|
||||
solution = UnitCommitment.solution(model)
|
||||
return UnitCommitment.write(
|
||||
joinpath(job_dir, "output.json"),
|
||||
solution,
|
||||
)
|
||||
processor.work_fn(job_id)
|
||||
@info "Job $job_id done"
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -67,6 +58,7 @@ function run!(processor::JobProcessor)
|
||||
# Remove job from processing queue
|
||||
take!(processor.processing)
|
||||
catch e
|
||||
@error "Failed job: $job_id" e
|
||||
open(log_path, "a") do io
|
||||
println(io, "\nError: ", e)
|
||||
println(io, "\nStacktrace:")
|
||||
|
||||
@@ -50,8 +50,24 @@ end
|
||||
function start_server(port::Int = 8080; optimizer)
|
||||
Random.seed!()
|
||||
|
||||
function work_fn(job_id)
|
||||
job_dir = joinpath(basedir, "jobs", job_id)
|
||||
json_path = joinpath(job_dir, "input.json.gz")
|
||||
instance = UnitCommitment.read(json_path)
|
||||
model = UnitCommitment.build_model(;
|
||||
instance,
|
||||
optimizer = optimizer,
|
||||
)
|
||||
UnitCommitment.optimize!(model)
|
||||
solution = UnitCommitment.solution(model)
|
||||
return UnitCommitment.write(
|
||||
joinpath(job_dir, "output.json"),
|
||||
solution,
|
||||
)
|
||||
end
|
||||
|
||||
# Create and start job processor
|
||||
processor = JobProcessor(optimizer = optimizer)
|
||||
processor = JobProcessor(; work_fn)
|
||||
start(processor)
|
||||
|
||||
router = HTTP.Router()
|
||||
|
||||
@@ -4,36 +4,30 @@
|
||||
|
||||
using Backend
|
||||
using Test
|
||||
using HiGHS
|
||||
|
||||
function jobs_test_usage()
|
||||
@testset "JobProcessor" begin
|
||||
# Setup job directory
|
||||
job_id = "qwe123"
|
||||
job_dir = joinpath(Backend.basedir, "jobs", job_id)
|
||||
mkpath(job_dir)
|
||||
cp(fixture("case14.json.gz"), joinpath(job_dir, "input.json.gz"))
|
||||
|
||||
try
|
||||
# Create processor with HiGHS optimizer
|
||||
processor = JobProcessor(optimizer = HiGHS.Optimizer)
|
||||
|
||||
# Start the worker
|
||||
start(processor)
|
||||
|
||||
# Push job to queue
|
||||
put!(processor, job_id)
|
||||
|
||||
# Stop worker (wait for jobs to finish)
|
||||
sleep(0.1)
|
||||
stop(processor)
|
||||
|
||||
# Check that solution file exists
|
||||
output_path = joinpath(job_dir, "output.json")
|
||||
@test isfile(output_path)
|
||||
finally
|
||||
# Cleanup
|
||||
rm(job_dir, recursive = true, force = true)
|
||||
# Define dummy work function
|
||||
received_job_id = []
|
||||
function work_fn(job_id)
|
||||
@show received_job_id
|
||||
push!(received_job_id, job_id)
|
||||
end
|
||||
|
||||
# Create processor with work function
|
||||
processor = JobProcessor(; work_fn)
|
||||
|
||||
# Start the worker
|
||||
start(processor)
|
||||
|
||||
# Push job to queue
|
||||
put!(processor, "test")
|
||||
|
||||
# Wait for job to complete
|
||||
sleep(0.1)
|
||||
stop(processor)
|
||||
|
||||
# Check that the work function was called with correct job_id
|
||||
@test received_job_id[1] == "test"
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user