Scheduler Orchestrator Service - Matilda
The Scheduler Orchestrator Service is the central "brain" of the Dante GPU Platform. Its primary function is to receive job requests, find suitable and available GPU providers, assign tasks to them, and monitor the overall progress of jobs.
1. Responsibilities
The Scheduler Orchestrator Service is tasked with the following:
Job Consumption: It subscribes to NATS subjects where new job requests are published (typically by the API Gateway) and dequeues these jobs for processing.
Provider Discovery: It communicates with the Provider Registry Service to get an up-to-date list of available GPU providers and their specifications (like GPU model, VRAM, status).
Scheduling Logic: It implements algorithms to match incoming jobs with the most appropriate providers based on job requirements (e.g., GPU type, count) and provider capabilities.
Billing Validation: Before dispatching a job, it integrates with the Billing Service to validate if the user has sufficient funds or meets billing criteria for the estimated job cost.
Task Dispatching: Once a suitable provider is found and billing is validated, it sends the task (derived from the job details) to the chosen provider's daemon, typically via NATS messages.
State Management: It maintains the state of jobs (e.g., pending, searching, dispatched, running, completed, failed) persistently in a database (PostgreSQL).
Status Tracking: It is expected to monitor the status of dispatched jobs (though the exact mechanism for receiving updates from provider daemons, like NATS or polling, needs full implementation).
Service Registration: It registers itself with Consul for service discovery by other components of the Dante platform.
2. Tech Stack
The service utilizes the following technologies:
Programming Language:
Go (version 1.22+)Messaging Queue:
NATS(including JetStream for durable job queues)Service Discovery:
HashiCorp Consul(via Consul API Client)Database:
PostgreSQL(usingpgxpoolfor connection pooling)HTTP Routing:
Chi router(for health checks and potential admin API)Logging: Zap for structured logging.
Client for Provider Registry: Custom HTTP client.
Client for Billing Service: Custom HTTP client.
3. Core Components and Logic
3.1. Configuration (configs/config.yaml)
The service's behavior is driven by configs/config.yaml. Key parameters include:
port: Listening port for its HTTP server (e.g., health checks).log_level: Logging verbosity.database_url: Connection string for PostgreSQL.consul_address,service_name,service_id_prefix,service_tags,health_check_path,health_check_interval,health_check_timeout: Parameters for Consul registration and health checking.nats_address,nats_job_submission_subject,nats_job_queue_group,nats_task_dispatch_subject_prefix,nats_job_status_update_subject_prefix: NATS connection and subject configuration for job consumption and task dispatching.provider_registry_service_name: Name of the Provider Registry service in Consul for discovery.scheduling_strategy,job_default_priority: Configuration for scheduling behavior.provider_query_timeout: Timeout for requests to the Provider Registry service.
3.2. Data Models (internal/models/)
Job: Represents a compute job submitted to the platform, received via NATS. Includes ID, UserID, Type, Name, Priority, resource requirements (GPUType, GPUCount), parameters, and submission timestamp.SchedulerJobState: Defines the internal states of a job managed by the scheduler (e.g.,Pending,Searching,Dispatched,Running,Completed,Failed,Cancelled).InternalJobRepresentation: A wrapper around theJobmodel that includes its currentState, assignedProviderID, schedulingAttempts,LastError, and timestamps (ReceivedAt,UpdatedAt).JobRecord: Represents the job structure for database persistence, adaptingInternalJobRepresentation.JobDetails(the originalJobobject) is stored as JSONB.Task: Represents the unit of work dispatched to a provider daemon. It's derived from aJoband includes details like JobID, UserID, JobType, JobParams, GPUTypeNeeded, GPUCountNeeded, AssignedProviderID, and DispatchedAt timestamp.
3.3. Job Store (internal/store/)
JobStoreInterface: Defines methods for job state persistence (SaveJob, GetJob, UpdateJobState, GetJobsByState, GetRetryableJobs, DeleteJob, Initialize, Close).PostgresJobStore: ImplementsJobStoreusing PostgreSQL.Initialization: Creates a
jobstable with appropriate columns (includingjob_detailsas JSONB) and indexes for efficient querying.Operations:
SaveJob: Uses an UPSERT (INSERT ON CONFLICT DO UPDATE) to save or update job records.GetJob: Retrieves a job by its ID.UpdateJobState: Updates state, provider ID, last error, and attempts for a job.GetJobsByState,GetRetryableJobs: Retrieve lists of jobs based on state and retry eligibility, ordered by priority and update time.
3.4. NATS Client & Job Consumer (internal/nats/, internal/scheduler/)
NATS Connection: Establishes and manages a robust connection to the NATS server, including JetStream context.
EnsureStream: Ensures that the required JetStream stream for job submissions exists.
JobConsumer:Subscription: Creates a durable pull subscription to the NATS subject specified for job submissions (e.g.,
jobs.submitted) using JetStream.Message Handling (
handleMessage):Receives raw job messages from NATS.
Unmarshals job data.
Checks if the job already exists in the
JobStore; if so, it loads its current state. Otherwise, it creates a newInternalJobRepresentationand saves it.Calls
scheduleJobto attempt scheduling.Updates the job's state in the
JobStorebased on the scheduling outcome.Acknowledges (ACKs) or negatively acknowledges (NAKs) the NATS message based on processing success or failure to ensure at-least-once delivery.
Shutdown (
Stop): Gracefully drains the NATS subscription.
3.5. Scheduling Logic (internal/scheduler/consumer.go - scheduleJob)
Provider Query: Fetches available providers from the Provider Registry Service.
Provider Matching:
Iterates through available providers.
Checks if provider status is
idle.Matches based on requested
GPUType(case-insensitive) andGPUCount.Selects the first suitable provider found (current strategy).
Billing Integration:
If a suitable provider is found, it calls the Billing Service to validate job requirements (e.g., user balance against estimated cost).
If validation passes, it attempts to start a billing session with the Billing Service.
Task Creation & Dispatch:
If a provider is selected and billing validation passes, a
Taskobject is created from the job details.The task is marshaled to JSON and published to a NATS subject specific to the chosen provider and job (e.g.,
tasks.dispatch.{providerID}.{jobID}).
State Update: Updates the internal job representation's state to
DispatchedorPending(if no provider found or dispatch failed) and clears/setsLastError.
3.6. External Service Clients
Provider Registry Client (
internal/clients/provider_registry.go):Handles communication with the Provider Registry Service.
getServiceAddress(): Discovers healthy instances of the provider-registry service via Consul, with simple caching and random instance selection.ListAvailableProviders(): Fetches the list of providers from the discovered registry URL (/providersendpoint).
Billing Client (
internal/billing/integration.go):Handles communication with the Billing Payment Service.
ValidateJobRequirements(): Checks if a user has sufficient balance and meets other billing criteria.StartSession(): Sends a request to the billing service to initiate a billing session for a job.(Other methods like
EndSession,GetSessionStatus,EstimateJobCostare defined but might not be fully used by the scheduler directly yet).
3.7. Consul Integration (internal/consul/)
Handles connection to Consul agent and service registration/deregistration.
Registers the Scheduler Orchestrator service with its health check endpoint.
3.8. HTTP Server (internal/server/)
A minimal HTTP server is set up (using Chi router) primarily for health checks required by Consul.
The health check endpoint (
/health) verifies connectivity to the database and NATS.
4. Workflow
Startup:
Loads configuration.
Initializes logger, database connection (PostgreSQL), and JobStore (creates
jobstable if needed).Connects to Consul and registers itself.
Connects to NATS and JetStream, ensures the job submission stream exists.
Initializes clients for Provider Registry and Billing services.
Starts the
JobConsumerwhich subscribes to the job submission NATS subject.Starts its own HTTP server for health checks.
Job Reception & Initial Processing:
JobConsumerreceives aJobmessage from NATS.The job is unmarshalled. If already in the
JobStore(e.g., from a previous failed attempt), its state is loaded. Otherwise, a newInternalJobRepresentationis created withJobStatePendingand saved to theJobStore.
Scheduling Attempt (
scheduleJob):Job state is set to
JobStateSearching.The Provider Registry Service is queried for available providers.
Providers are filtered based on status (
idle), GPU type, and GPU count.If a suitable provider is found:
Billing validation is performed by calling the Billing Service.
If billing validation succeeds, a billing session is started.
A
Taskis created and dispatched to the provider via a NATS message on a provider-specific subject.Job state is updated to
JobStateDispatchedin theInternalJobRepresentation.
If no suitable provider is found, or if billing validation/session start fails, or if NATS dispatch fails:
The job state is set to
JobStatePendingorJobStateFailedwith an error message.
Post-Scheduling:
The
JobConsumerupdates the job's final state (e.g.,Dispatched,Pending,Failed) and other details (ProviderID, LastError, Attempts) in theJobStore.The NATS message for the job is ACK'd (if successfully dispatched or terminally failed to prevent redelivery) or NAK'd (if it should be retried later, e.g., no provider found).
Status Updates & Completion (Conceptual - full implementation may vary):
The Provider Daemon, after receiving and processing a task, would publish status updates (e.g.,
Running,Completed,Failed) to a NATS subject (e.g.,jobs.status.{jobID}).The Scheduler Orchestrator would subscribe to these status updates.
Upon receiving a
CompletedorFailedstatus, the Scheduler would update the job's state in itsJobStoreand potentially notify other services or the user (e.g., via API Gateway or another NATS message).If a job is
Completed, the Billing Service would be notified to finalize the billing session.
Shutdown:
Receives a termination signal (SIGINT, SIGTERM).
Stops the
JobConsumer(drains NATS subscription).Deregisters from Consul.
Shuts down its HTTP server.
Drains and closes the NATS connection.
Closes the database connection pool (implicitly via
dbPool.Close()deferred in main).
5. API Endpoints (Minimal)
GET /health: Health check endpoint for Consul. Checks database and NATS connectivity.
6. Future Considerations
Advanced Scheduling Strategies: Implement more sophisticated scheduling algorithms beyond simple first-fit/round-robin (e.g., least-busy, cost-based, priority-based, location-aware, GPU capability matching).
Job Status API: Expose an API for querying job statuses, potentially through the API Gateway.
Resilience and Retry Logic: Enhance retry mechanisms for job scheduling and task dispatching, including configurable backoff strategies.
Job Queue Management: Implement more advanced queue management features (e.g., prioritization, reordering).
Task Status Subscription: Fully implement subscription to NATS subjects for real-time status updates from provider daemons.
Scalability: Ensure the scheduler can handle a high volume of incoming jobs and provider updates.
Resource Reservation: Mechanisms to reserve provider resources for a job.
Integration with Storage Service: For handling job inputs/outputs if paths are provided in job parameters.
Job Cancellation: Implement logic to handle job cancellation requests.
Last updated