web/backend: use multiprocessing instead of threads; improve logging

This commit is contained in:
2025-11-12 11:52:30 -06:00
parent 117bf478f4
commit c9e18d4fe4
9 changed files with 102 additions and 32 deletions

View File

@@ -5,13 +5,21 @@ authors = ["Alinson S. Xavier <git@axavier.org>"]
[deps] [deps]
CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193" CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
UnitCommitment = "64606440-39ea-11e9-0f29-3303a1d3d877" UnitCommitment = "64606440-39ea-11e9-0f29-3303a1d3d877"
[compat] [compat]
CodecZlib = "0.7.8" CodecZlib = "0.7.8"
Dates = "1.11.0"
Distributed = "1.11.0"
HTTP = "1.10.19" HTTP = "1.10.19"
JSON = "0.21.4" JSON = "0.21.4"
Logging = "1.11.0"
Printf = "1.11.0"
Random = "1.11.0" Random = "1.11.0"

View File

@@ -8,5 +8,6 @@ basedir = joinpath(dirname(@__FILE__), "..")
include("jobs.jl") include("jobs.jl")
include("server.jl") include("server.jl")
include("log.jl")
end end

View File

@@ -2,18 +2,19 @@
# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved. # Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved.
# Released under the modified BSD license. See COPYING.md for more details. # Released under the modified BSD license. See COPYING.md for more details.
using Distributed
import Base: put! import Base: put!
Base.@kwdef mutable struct JobProcessor Base.@kwdef mutable struct JobProcessor
pending::Channel{String} = Channel{String}(Inf) pending = RemoteChannel(() -> Channel{String}(Inf))
processing::Channel{String} = Channel{String}(Inf) processing = RemoteChannel(() -> Channel{String}(Inf))
shutdown::Channel{Bool} = Channel{Bool}(1) shutdown = RemoteChannel(() -> Channel{Bool}(1))
worker_task::Union{Task,Nothing} = nothing worker_pid = nothing
work_fn::Function monitor_task = nothing
work_fn = nothing
end end
function Base.put!(processor::JobProcessor, job_id::String) function Base.put!(processor::JobProcessor, job_id::String)
@info "New job received: $job_id"
return put!(processor.pending, job_id) return put!(processor.pending, job_id)
end end
@@ -21,46 +22,58 @@ function isbusy(processor::JobProcessor)
return isready(processor.pending) || isready(processor.processing) return isready(processor.pending) || isready(processor.processing)
end end
function run!(processor::JobProcessor) function worker_loop(pending, processing, shutdown, work_fn)
@info "Starting worker loop"
while true while true
# Check for shutdown signal # Check for shutdown signal
if isready(processor.shutdown) if isready(shutdown)
@info "Shutdown signal received"
break break
end end
# Wait for a job with timeout # Wait for a job with timeout
if !isready(processor.pending) if !isready(pending)
sleep(0.1) sleep(0.1)
continue continue
end end
# Move job from pending to processing queue # Move job from pending to processing queue
job_id = take!(processor.pending) job_id = take!(pending)
put!(processor.processing, job_id) put!(processing, job_id)
@info "Job started: $job_id"
# Run work function # Run work function
processor.work_fn(job_id) try
work_fn(job_id)
catch e
@error "Job failed: job $job_id"
end
# Remove job from processing queue # Remove job from processing queue
take!(processor.processing) take!(processing)
@info "Job finished: $job_id"
end end
end end
function start(processor::JobProcessor) function start(processor::JobProcessor)
processor.worker_task = Threads.@spawn run!(processor) processor.monitor_task = @spawn begin
worker_loop(
processor.pending,
processor.processing,
processor.shutdown,
processor.work_fn,
)
end
return return
end end
function stop(processor::JobProcessor) function stop(processor::JobProcessor)
# Signal worker to stop
put!(processor.shutdown, true) put!(processor.shutdown, true)
if processor.monitor_task !== nothing
# Wait for worker to finish
if processor.worker_task !== nothing
try try
wait(processor.worker_task) wait(processor.monitor_task)
catch catch e
# Worker may have already exited @warn "Error waiting for worker task" exception=e
end end
end end
return return

36
web/backend/src/log.jl Normal file
View File

@@ -0,0 +1,36 @@
# UnitCommitment.jl: Optimization Package for Security-Constrained Unit Commitment
# Copyright (C) 2025, UChicago Argonne, LLC. All rights reserved.
# Released under the modified BSD license. See COPYING.md for more details.
import Logging: min_enabled_level, shouldlog, handle_message
using Base.CoreLogging, Logging, Dates
struct TimeLogger <: AbstractLogger end
min_enabled_level(::TimeLogger) = CoreLogging.Info
shouldlog(logger::TimeLogger, level, _module, group, id) = true
function handle_message(
logger::TimeLogger,
level,
message,
_module,
group,
id,
filepath,
line;
kwargs...,
)
current_time = Dates.format(now(), "yyyy-mm-dd HH:MM:SS.sss")
print("[$current_time] ")
println(message)
flush(stdout)
flush(stderr)
return Base.Libc.flush_cstdio()
end
function setup_logger()
global_logger(TimeLogger())
@spawn global_logger(TimeLogger())
return
end

View File

@@ -107,7 +107,6 @@ function start_server(host, port; optimizer)
end end
end end
catch e catch e
@error "Failed job: $job_id" e
open(log_filename, "a") do io open(log_filename, "a") do io
println(io, "\nError: ", e) println(io, "\nError: ", e)
println(io, "\nStacktrace:") println(io, "\nStacktrace:")

View File

@@ -6,18 +6,18 @@ authors = ["Alinson S. Xavier <git@axavier.org>"]
[deps] [deps]
Backend = "948642ed-e3f9-4642-9296-0f1eaf40c938" Backend = "948642ed-e3f9-4642-9296-0f1eaf40c938"
CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193" CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
HiGHS = "87dc4568-4c63-4d18-b0c0-bb2238e4078b" HiGHS = "87dc4568-4c63-4d18-b0c0-bb2238e4078b"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
JuliaFormatter = "98e50ef6-434e-11e9-1051-2b60c6c9e899" JuliaFormatter = "98e50ef6-434e-11e9-1051-2b60c6c9e899"
Revise = "295af30f-e4ad-537b-8983-00126c2a3abe"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
[compat] [compat]
CodecZlib = "0.7.8" CodecZlib = "0.7.8"
Distributed = "1.11.0"
HTTP = "1.10.19" HTTP = "1.10.19"
HiGHS = "1.20.1" HiGHS = "1.20.1"
JSON = "0.21.4" JSON = "0.21.4"
JuliaFormatter = "2.2.0" JuliaFormatter = "2.2.0"
Revise = "3.12.0"
Test = "1.11.0" Test = "1.11.0"

View File

@@ -4,6 +4,8 @@
module BackendT module BackendT
using Distributed
using Test using Test
using HTTP using HTTP
using JSON using JSON
@@ -22,6 +24,7 @@ function fixture(path::String)::String
end end
function runtests() function runtests()
Backend.setup_logger()
@testset "UCJL Backend" begin @testset "UCJL Backend" begin
server_test_usage() server_test_usage()
jobs_test_usage() jobs_test_usage()

View File

@@ -7,10 +7,14 @@ using Test
function jobs_test_usage() function jobs_test_usage()
@testset "JobProcessor" begin @testset "JobProcessor" begin
# Define dummy work function # Create a temporary directory for test output
received_job_id = [] test_dir = mktempdir()
# Define dummy work function that writes to a file
# Note: This function will be executed on a worker process
function work_fn(job_id) function work_fn(job_id)
push!(received_job_id, job_id) output_file = joinpath(test_dir, job_id * ".txt")
write(output_file, job_id)
return return
end end
@@ -24,10 +28,16 @@ function jobs_test_usage()
put!(processor, "test") put!(processor, "test")
# Wait for job to complete # Wait for job to complete
sleep(0.1) # Increased timeout to account for worker process startup
sleep(2)
stop(processor) stop(processor)
# Check that the work function was called with correct job_id # Check that the work function was called with correct job_id
@test received_job_id[1] == "test" output_file = joinpath(test_dir, "test.txt")
@test isfile(output_file)
@test read(output_file, String) == "test"
# Clean up
rm(test_dir; recursive = true)
end end
end end

View File

@@ -13,7 +13,7 @@ function server_test_usage()
# Submit test case # Submit test case
response = HTTP.post( response = HTTP.post(
"http://$HOST:$PORT/submit", "http://$HOST:$PORT/api/submit",
["Content-Type" => "application/gzip"], ["Content-Type" => "application/gzip"],
compressed_data, compressed_data,
) )
@@ -26,7 +26,7 @@ function server_test_usage()
@test length(job_id) == 16 @test length(job_id) == 16
# Wait for jobs to finish # Wait for jobs to finish
sleep(0.1) sleep(5)
while isbusy(server.processor) while isbusy(server.processor)
sleep(0.1) sleep(0.1)
end end
@@ -43,7 +43,7 @@ function server_test_usage()
@test saved_data == compressed_data @test saved_data == compressed_data
# Query job information # Query job information
view_response = HTTP.get("http://$HOST:$PORT/jobs/$job_id/view") view_response = HTTP.get("http://$HOST:$PORT/api/jobs/$job_id/view")
@test view_response.status == 200 @test view_response.status == 200
# Check response # Check response