Compare commits

..

3 Commits

5 changed files with 132 additions and 98 deletions

View File

@@ -2,8 +2,6 @@
# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved.
# Released under the modified BSD license. See COPYING.md for more details.
using UnitCommitment
import Base: put!
Base.@kwdef mutable struct JobProcessor
@@ -11,7 +9,7 @@ Base.@kwdef mutable struct JobProcessor
processing::Channel{String} = Channel{String}(Inf)
shutdown::Channel{Bool} = Channel{Bool}(1)
worker_task::Union{Task,Nothing} = nothing
optimizer = nothing
work_fn::Function
end
function Base.put!(processor::JobProcessor, job_id::String)
@@ -38,41 +36,13 @@ function run!(processor::JobProcessor)
# Move job from pending to processing queue
job_id = take!(processor.pending)
@info "Processing job: $job_id"
job_dir = joinpath(basedir, "jobs", job_id)
log_path = joinpath(job_dir, "output.log")
put!(processor.processing, job_id)
# Run optimization
try
open(log_path, "w") do io
redirect_stdout(io) do
redirect_stderr(io) do
json_path = joinpath(job_dir, "input.json.gz")
instance = UnitCommitment.read(json_path)
model = UnitCommitment.build_model(;
instance,
optimizer = processor.optimizer,
)
UnitCommitment.optimize!(model)
solution = UnitCommitment.solution(model)
return UnitCommitment.write(
joinpath(job_dir, "output.json"),
solution,
)
end
end
end
# Run work function
processor.work_fn(job_id)
# Remove job from processing queue
take!(processor.processing)
catch e
open(log_path, "a") do io
println(io, "\nError: ", e)
println(io, "\nStacktrace:")
return Base.show_backtrace(io, catch_backtrace())
end
end
end
end

View File

@@ -44,14 +44,73 @@ function submit(req, processor::JobProcessor)
end
function jobs_view(req)
return HTTP.Response(200, "OK")
# Extract job_id from URL path /jobs/{job_id}/view
path_parts = split(req.target, '/')
job_id = path_parts[3] # /jobs/{job_id}/view -> index 3
# Construct job directory path
job_dir = joinpath(basedir, "jobs", job_id)
# Check if job directory exists
if !isdir(job_dir)
return HTTP.Response(404, "Job not found")
end
# Read log file if it exists
log_path = joinpath(job_dir, "output.log")
log_content = isfile(log_path) ? read(log_path, String) : nothing
# Read output.json if it exists
output_path = joinpath(job_dir, "output.json")
output_content = isfile(output_path) ? read(output_path, String) : nothing
# Create response JSON
response_data = Dict(
"log" => log_content,
"solution" => output_content
)
response_body = JSON.json(response_data)
return HTTP.Response(200, response_body)
end
function start_server(port::Int = 8080; optimizer)
Random.seed!()
function work_fn(job_id)
job_dir = joinpath(basedir, "jobs", job_id)
mkpath(job_dir)
input_filename = joinpath(job_dir, "input.json.gz")
log_filename = joinpath(job_dir, "output.log")
solution_filename = joinpath(job_dir, "output.json")
try
open(log_filename, "w") do io
redirect_stdout(io) do
redirect_stderr(io) do
instance = UnitCommitment.read(input_filename)
model = UnitCommitment.build_model(;
instance,
optimizer = optimizer,
)
UnitCommitment.optimize!(model)
solution = UnitCommitment.solution(model)
UnitCommitment.write(solution_filename, solution)
end
end
end
catch e
@error "Failed job: $job_id" e
open(log_filename, "a") do io
println(io, "\nError: ", e)
println(io, "\nStacktrace:")
return Base.show_backtrace(io, catch_backtrace())
end
end
return
end
# Create and start job processor
processor = JobProcessor(optimizer = optimizer)
processor = JobProcessor(; work_fn)
start(processor)
router = HTTP.Router()

View File

@@ -24,7 +24,7 @@ end
function runtests()
@testset "UCJL Backend" begin
server_test_usage()
# jobs_test_usage()
jobs_test_usage()
end
return
end

View File

@@ -4,40 +4,29 @@
using Backend
using Test
using HiGHS
function jobs_test_usage()
@testset "JobProcessor" begin
# Setup job directory
job_id = "qwe123"
job_dir = joinpath(Backend.basedir, "jobs", job_id)
mkpath(job_dir)
cp(fixture("case14.json.gz"), joinpath(job_dir, "input.json.gz"))
# Define dummy work function
received_job_id = []
function work_fn(job_id)
push!(received_job_id, job_id)
end
try
# Create processor with HiGHS optimizer
processor = JobProcessor(optimizer = HiGHS.Optimizer)
# Create processor with work function
processor = JobProcessor(; work_fn)
# Start the worker
start(processor)
# Push job to queue
put!(processor, job_id)
put!(processor, "test")
# Wait until all jobs are processed
while isbusy(processor)
# Wait for job to complete
sleep(0.1)
end
# Check that solution file exists
output_path = joinpath(job_dir, "output.json")
@test isfile(output_path)
# Stop the worker
stop(processor)
finally
# Cleanup
rm(job_dir, recursive = true, force = true)
end
# Check that the work function was called with correct job_id
@test received_job_id[1] == "test"
end
end

View File

@@ -6,7 +6,7 @@ const PORT = 32617
function server_test_usage()
server = Backend.start_server(PORT; optimizer = HiGHS.Optimizer)
try
# Read the compressed fixture file
compressed_data = read(fixture("case14.json.gz"))
@@ -24,9 +24,11 @@ function server_test_usage()
job_id = response_data["job_id"]
@test length(job_id) == 16
# Wait for jobs to finish and stop server
# Wait for jobs to finish
sleep(0.1)
stop(server)
while isbusy(server.processor)
sleep(0.1)
end
# Verify the compressed file was saved correctly
job_dir = joinpath(Backend.basedir, "jobs", job_id)
@@ -39,6 +41,20 @@ function server_test_usage()
saved_data = read(saved_input_path)
@test saved_data == compressed_data
# Clean up: remove the job directory
# rm(job_dir, recursive=true)
# Query job information
view_response = HTTP.get("http://localhost:$PORT/jobs/$job_id/view")
@test view_response.status == 200
# Check response
view_data = JSON.parse(String(view_response.body))
@test haskey(view_data, "log")
@test haskey(view_data, "solution")
@test view_data["log"] !== nothing
@test view_data["solution"] !== nothing
# Clean up
rm(job_dir, recursive=true)
finally
stop(server)
end
end