web: backend: Implement job queue

This commit is contained in:
2025-11-06 15:22:19 -06:00
parent 5c7b8038a1
commit 35dd5ab1a9
6 changed files with 270 additions and 113 deletions

View File

@@ -4,64 +4,9 @@
module Backend
using HTTP
using Random
using JSON
using CodecZlib
using UnitCommitment
basedir = joinpath(dirname(@__FILE__), "..")
function submit(req; optimizer)
# Check if request body is empty
compressed_body = HTTP.payload(req)
if isempty(compressed_body)
return HTTP.Response(400, "Error: No file provided")
end
# Validate compressed JSON by decompressing and parsing
try
decompressed_data = transcode(GzipDecompressor, compressed_body)
JSON.parse(String(decompressed_data))
catch e
return HTTP.Response(400, "Error: Invalid compressed JSON")
end
# Generate random job ID (lowercase letters and numbers)
job_id = randstring(['a':'z'; '0':'9'], 16)
# Create job directory
job_dir = joinpath(basedir, "jobs", job_id)
mkpath(job_dir)
# Save input file
json_path = joinpath(job_dir, "input.json.gz")
write(json_path, compressed_body)
# Optimize file
instance = UnitCommitment.read(json_path)
model = UnitCommitment.build_model(; instance, optimizer)
UnitCommitment.optimize!(model)
solution = UnitCommitment.solution(model)
UnitCommitment.write("$job_dir/output.json", solution)
# Return job ID as JSON
response_body = JSON.json(Dict("job_id" => job_id))
return HTTP.Response(200, response_body)
end
function jobs_view(req)
return HTTP.Response(200, "OK")
end
function start_server(port::Int = 8080; optimizer)
Random.seed!()
router = HTTP.Router()
HTTP.register!(router, "POST", "/submit", req -> submit(req; optimizer))
HTTP.register!(router, "GET", "/jobs/*/view", jobs_view)
server = HTTP.serve!(router, port; verbose = false)
return server
end
include("jobs.jl")
include("server.jl")
end

99
web/backend/src/jobs.jl Normal file
View File

@@ -0,0 +1,99 @@
# UnitCommitment.jl: Optimization Package for Security-Constrained Unit Commitment
# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved.
# Released under the modified BSD license. See COPYING.md for more details.
using UnitCommitment
import Base: put!
Base.@kwdef mutable struct JobProcessor
pending::Channel{String} = Channel{String}(Inf)
processing::Channel{String} = Channel{String}(Inf)
shutdown::Channel{Bool} = Channel{Bool}(1)
worker_task::Union{Task,Nothing} = nothing
optimizer = nothing
end
function Base.put!(processor::JobProcessor, job_id::String)
@info "New job received: $job_id"
return put!(processor.pending, job_id)
end
function isbusy(processor::JobProcessor)
return isready(processor.pending) || isready(processor.processing)
end
function run!(processor::JobProcessor)
while true
# Check for shutdown signal
if isready(processor.shutdown)
break
end
# Wait for a job with timeout
if !isready(processor.pending)
sleep(0.1)
continue
end
# Move job from pending to processing queue
job_id = take!(processor.pending)
@info "Processing job: $job_id"
job_dir = joinpath(basedir, "jobs", job_id)
log_path = joinpath(job_dir, "output.log")
put!(processor.processing, job_id)
# Run optimization
try
open(log_path, "w") do io
redirect_stdout(io) do
redirect_stderr(io) do
json_path = joinpath(job_dir, "input.json.gz")
instance = UnitCommitment.read(json_path)
model = UnitCommitment.build_model(;
instance,
optimizer = processor.optimizer,
)
UnitCommitment.optimize!(model)
solution = UnitCommitment.solution(model)
return UnitCommitment.write(
joinpath(job_dir, "output.json"),
solution,
)
end
end
end
# Remove job from processing queue
take!(processor.processing)
catch e
open(log_path, "a") do io
println(io, "\nError: ", e)
println(io, "\nStacktrace:")
return Base.show_backtrace(io, catch_backtrace())
end
end
end
end
function start(processor::JobProcessor)
processor.worker_task = @async run!(processor)
return
end
function stop(processor::JobProcessor)
# Signal worker to stop
put!(processor.shutdown, true)
# Wait for worker to finish
if processor.worker_task !== nothing
try
wait(processor.worker_task)
catch
# Worker may have already exited
end
end
return
end
export JobProcessor, start, stop, put!, isbusy

73
web/backend/src/server.jl Normal file
View File

@@ -0,0 +1,73 @@
using HTTP
using Random
using JSON
using CodecZlib
using UnitCommitment
struct ServerHandle
server::HTTP.Server
processor::JobProcessor
end
function submit(req, processor::JobProcessor)
# Check if request body is empty
compressed_body = HTTP.payload(req)
if isempty(compressed_body)
return HTTP.Response(400, "Error: No file provided")
end
# Validate compressed JSON by decompressing and parsing
try
decompressed_data = transcode(GzipDecompressor, compressed_body)
JSON.parse(String(decompressed_data))
catch e
return HTTP.Response(400, "Error: Invalid compressed JSON")
end
# Generate random job ID (lowercase letters and numbers)
job_id = randstring(['a':'z'; '0':'9'], 16)
# Create job directory
job_dir = joinpath(basedir, "jobs", job_id)
mkpath(job_dir)
# Save input file
json_path = joinpath(job_dir, "input.json.gz")
write(json_path, compressed_body)
# Add job to queue
put!(processor, job_id)
# Return job ID as JSON
response_body = JSON.json(Dict("job_id" => job_id))
return HTTP.Response(200, response_body)
end
function jobs_view(req)
return HTTP.Response(200, "OK")
end
function start_server(port::Int = 8080; optimizer)
Random.seed!()
# Create and start job processor
processor = JobProcessor(optimizer = optimizer)
start(processor)
router = HTTP.Router()
# Register /submit endpoint
HTTP.register!(router, "POST", "/submit", req -> submit(req, processor))
# Register job/*/view endpoint
HTTP.register!(router, "GET", "/jobs/*/view", jobs_view)
server = HTTP.serve!(router, port; verbose = false)
return ServerHandle(server, processor)
end
function stop(handle::ServerHandle)
stop(handle.processor)
close(handle.server)
return nothing
end

View File

@@ -12,73 +12,26 @@ import Backend
import JuliaFormatter
using HiGHS
basedir = dirname(@__FILE__)
port = 32617
BASEDIR = dirname(@__FILE__)
include("jobs_test.jl")
include("server_test.jl")
function fixture(path::String)::String
return "$basedir/../fixtures/$path"
end
function with_server(f)
logger = Test.TestLogger()
# server = Base.CoreLogging.with_logger(logger) do
# Backend.start_server(port)
# end
server = Backend.start_server(port; optimizer=HiGHS.Optimizer)
try
f()
finally
close(server)
end
return filter!(x -> x.group == :access, logger.logs)
end
function test_usage()
with_server() do
# Read the compressed fixture file
compressed_data = read(fixture("case14.json.gz"))
# Submit test case
response = HTTP.post(
"http://localhost:$port/submit",
["Content-Type" => "application/gzip"],
compressed_data
)
@test response.status == 200
# Check response
response_data = JSON.parse(String(response.body))
@test haskey(response_data, "job_id")
job_id = response_data["job_id"]
@test length(job_id) == 16
@test all(c -> c in ['a':'z'; '0':'9'], collect(job_id))
# Verify the compressed file was saved correctly
job_dir = joinpath(Backend.basedir, "jobs", job_id)
saved_path = joinpath(job_dir, "input.json.gz")
@test isfile(saved_path)
saved_data = read(saved_path)
@test saved_data == compressed_data
response = HTTP.get("http://localhost:$port/jobs/123/view")
@test response.status == 200
@test String(response.body) == "OK"
# Clean up: remove the job directory
# rm(job_dir, recursive=true)
end
return "$BASEDIR/../fixtures/$path"
end
function runtests()
@testset "UCJL Backend" begin
test_usage()
server_test_usage()
# jobs_test_usage()
end
return
end
function format()
JuliaFormatter.format(basedir, verbose = true)
JuliaFormatter.format("$basedir/../../src", verbose = true)
JuliaFormatter.format(BASEDIR, verbose = true)
JuliaFormatter.format("$BASEDIR/../../src", verbose = true)
return
end

View File

@@ -0,0 +1,43 @@
# UnitCommitment.jl: Optimization Package for Security-Constrained Unit Commitment
# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved.
# Released under the modified BSD license. See COPYING.md for more details.
using Backend
using Test
using HiGHS
function jobs_test_usage()
@testset "JobProcessor" begin
# Setup job directory
job_id = "qwe123"
job_dir = joinpath(Backend.basedir, "jobs", job_id)
mkpath(job_dir)
cp(fixture("case14.json.gz"), joinpath(job_dir, "input.json.gz"))
try
# Create processor with HiGHS optimizer
processor = JobProcessor(optimizer = HiGHS.Optimizer)
# Start the worker
start(processor)
# Push job to queue
put!(processor, job_id)
# Wait until all jobs are processed
while isbusy(processor)
sleep(0.1)
end
# Check that solution file exists
output_path = joinpath(job_dir, "output.json")
@test isfile(output_path)
# Stop the worker
stop(processor)
finally
# Cleanup
rm(job_dir, recursive = true, force = true)
end
end
end

View File

@@ -0,0 +1,44 @@
# UnitCommitment.jl: Optimization Package for Security-Constrained Unit Commitment
# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved.
# Released under the modified BSD license. See COPYING.md for more details.
const PORT = 32617
function server_test_usage()
server = Backend.start_server(PORT; optimizer = HiGHS.Optimizer)
# Read the compressed fixture file
compressed_data = read(fixture("case14.json.gz"))
# Submit test case
response = HTTP.post(
"http://localhost:$PORT/submit",
["Content-Type" => "application/gzip"],
compressed_data,
)
@test response.status == 200
# Check response
response_data = JSON.parse(String(response.body))
@test haskey(response_data, "job_id")
job_id = response_data["job_id"]
@test length(job_id) == 16
# Wait for jobs to finish and stop server
sleep(0.1)
stop(server)
# Verify the compressed file was saved correctly
job_dir = joinpath(Backend.basedir, "jobs", job_id)
saved_input_path = joinpath(job_dir, "input.json.gz")
saved_log_path = joinpath(job_dir, "output.log")
saved_output_path = joinpath(job_dir, "output.json")
@test isfile(saved_input_path)
@test isfile(saved_log_path)
@test isfile(saved_output_path)
saved_data = read(saved_input_path)
@test saved_data == compressed_data
# Clean up: remove the job directory
# rm(job_dir, recursive=true)
end