list-musicScheduler Orchestrator Service - Matilda

The Scheduler Orchestrator Service is the central "brain" of the Dante GPU Platform. Its primary function is to receive job requests, find suitable and available GPU providers, assign tasks to them, and monitor the overall progress of jobs.


1. Responsibilities

The Scheduler Orchestrator Service is tasked with the following:

  • Job Consumption: It subscribes to NATS subjects where new job requests are published (typically by the API Gateway) and dequeues these jobs for processing.

  • Provider Discovery: It communicates with the Provider Registry Service to get an up-to-date list of available GPU providers and their specifications (like GPU model, VRAM, status).

  • Scheduling Logic: It implements algorithms to match incoming jobs with the most appropriate providers based on job requirements (e.g., GPU type, count) and provider capabilities.

  • Billing Validation: Before dispatching a job, it integrates with the Billing Service to validate if the user has sufficient funds or meets billing criteria for the estimated job cost.

  • Task Dispatching: Once a suitable provider is found and billing is validated, it sends the task (derived from the job details) to the chosen provider's daemon, typically via NATS messages.

  • State Management: It maintains the state of jobs (e.g., pending, searching, dispatched, running, completed, failed) persistently in a database (PostgreSQL).

  • Status Tracking: It is expected to monitor the status of dispatched jobs (though the exact mechanism for receiving updates from provider daemons, like NATS or polling, needs full implementation).

  • Service Registration: It registers itself with Consul for service discovery by other components of the Dante platform.


2. Tech Stack

The service utilizes the following technologies:

  • Programming Language: Go (version 1.22+)

  • Messaging Queue: NATS (including JetStream for durable job queues)

  • Service Discovery: HashiCorp Consul (via Consul API Client)

  • Database: PostgreSQL (using pgxpool for connection pooling)

  • HTTP Routing: Chi router (for health checks and potential admin API)

  • Logging: Zap for structured logging.

  • Client for Provider Registry: Custom HTTP client.

  • Client for Billing Service: Custom HTTP client.


3. Core Components and Logic

3.1. Configuration (configs/config.yaml)

The service's behavior is driven by configs/config.yaml. Key parameters include:

  • port: Listening port for its HTTP server (e.g., health checks).

  • log_level: Logging verbosity.

  • database_url: Connection string for PostgreSQL.

  • consul_address, service_name, service_id_prefix, service_tags, health_check_path, health_check_interval, health_check_timeout: Parameters for Consul registration and health checking.

  • nats_address, nats_job_submission_subject, nats_job_queue_group, nats_task_dispatch_subject_prefix, nats_job_status_update_subject_prefix: NATS connection and subject configuration for job consumption and task dispatching.

  • provider_registry_service_name: Name of the Provider Registry service in Consul for discovery.

  • scheduling_strategy, job_default_priority: Configuration for scheduling behavior.

  • provider_query_timeout: Timeout for requests to the Provider Registry service.

3.2. Data Models (internal/models/)

  • Job: Represents a compute job submitted to the platform, received via NATS. Includes ID, UserID, Type, Name, Priority, resource requirements (GPUType, GPUCount), parameters, and submission timestamp.

  • SchedulerJobState: Defines the internal states of a job managed by the scheduler (e.g., Pending, Searching, Dispatched, Running, Completed, Failed, Cancelled).

  • InternalJobRepresentation: A wrapper around the Job model that includes its current State, assigned ProviderID, scheduling Attempts, LastError, and timestamps (ReceivedAt, UpdatedAt).

  • JobRecord: Represents the job structure for database persistence, adapting InternalJobRepresentation. JobDetails (the original Job object) is stored as JSONB.

  • Task: Represents the unit of work dispatched to a provider daemon. It's derived from a Job and includes details like JobID, UserID, JobType, JobParams, GPUTypeNeeded, GPUCountNeeded, AssignedProviderID, and DispatchedAt timestamp.

3.3. Job Store (internal/store/)

  • JobStore Interface: Defines methods for job state persistence (SaveJob, GetJob, UpdateJobState, GetJobsByState, GetRetryableJobs, DeleteJob, Initialize, Close).

  • PostgresJobStore: Implements JobStore using PostgreSQL.

    • Initialization: Creates a jobs table with appropriate columns (including job_details as JSONB) and indexes for efficient querying.

    • Operations:

      • SaveJob: Uses an UPSERT (INSERT ON CONFLICT DO UPDATE) to save or update job records.

      • GetJob: Retrieves a job by its ID.

      • UpdateJobState: Updates state, provider ID, last error, and attempts for a job.

      • GetJobsByState, GetRetryableJobs: Retrieve lists of jobs based on state and retry eligibility, ordered by priority and update time.

3.4. NATS Client & Job Consumer (internal/nats/, internal/scheduler/)

  • NATS Connection: Establishes and manages a robust connection to the NATS server, including JetStream context.

    • EnsureStream: Ensures that the required JetStream stream for job submissions exists.

  • JobConsumer:

    • Subscription: Creates a durable pull subscription to the NATS subject specified for job submissions (e.g., jobs.submitted) using JetStream.

    • Message Handling (handleMessage):

      1. Receives raw job messages from NATS.

      2. Unmarshals job data.

      3. Checks if the job already exists in the JobStore; if so, it loads its current state. Otherwise, it creates a new InternalJobRepresentation and saves it.

      4. Calls scheduleJob to attempt scheduling.

      5. Updates the job's state in the JobStore based on the scheduling outcome.

      6. Acknowledges (ACKs) or negatively acknowledges (NAKs) the NATS message based on processing success or failure to ensure at-least-once delivery.

    • Shutdown (Stop): Gracefully drains the NATS subscription.

3.5. Scheduling Logic (internal/scheduler/consumer.go - scheduleJob)

  • Provider Query: Fetches available providers from the Provider Registry Service.

  • Provider Matching:

    • Iterates through available providers.

    • Checks if provider status is idle.

    • Matches based on requested GPUType (case-insensitive) and GPUCount.

    • Selects the first suitable provider found (current strategy).

  • Billing Integration:

    • If a suitable provider is found, it calls the Billing Service to validate job requirements (e.g., user balance against estimated cost).

    • If validation passes, it attempts to start a billing session with the Billing Service.

  • Task Creation & Dispatch:

    • If a provider is selected and billing validation passes, a Task object is created from the job details.

    • The task is marshaled to JSON and published to a NATS subject specific to the chosen provider and job (e.g., tasks.dispatch.{providerID}.{jobID}).

  • State Update: Updates the internal job representation's state to Dispatched or Pending (if no provider found or dispatch failed) and clears/sets LastError.

3.6. External Service Clients

  • Provider Registry Client (internal/clients/provider_registry.go):

    • Handles communication with the Provider Registry Service.

    • getServiceAddress(): Discovers healthy instances of the provider-registry service via Consul, with simple caching and random instance selection.

    • ListAvailableProviders(): Fetches the list of providers from the discovered registry URL (/providers endpoint).

  • Billing Client (internal/billing/integration.go):

    • Handles communication with the Billing Payment Service.

    • ValidateJobRequirements(): Checks if a user has sufficient balance and meets other billing criteria.

    • StartSession(): Sends a request to the billing service to initiate a billing session for a job.

    • (Other methods like EndSession, GetSessionStatus, EstimateJobCost are defined but might not be fully used by the scheduler directly yet).

3.7. Consul Integration (internal/consul/)

  • Handles connection to Consul agent and service registration/deregistration.

  • Registers the Scheduler Orchestrator service with its health check endpoint.

3.8. HTTP Server (internal/server/)

  • A minimal HTTP server is set up (using Chi router) primarily for health checks required by Consul.

  • The health check endpoint (/health) verifies connectivity to the database and NATS.


4. Workflow

  1. Startup:

    • Loads configuration.

    • Initializes logger, database connection (PostgreSQL), and JobStore (creates jobs table if needed).

    • Connects to Consul and registers itself.

    • Connects to NATS and JetStream, ensures the job submission stream exists.

    • Initializes clients for Provider Registry and Billing services.

    • Starts the JobConsumer which subscribes to the job submission NATS subject.

    • Starts its own HTTP server for health checks.

  2. Job Reception & Initial Processing:

    • JobConsumer receives a Job message from NATS.

    • The job is unmarshalled. If already in the JobStore (e.g., from a previous failed attempt), its state is loaded. Otherwise, a new InternalJobRepresentation is created with JobStatePending and saved to the JobStore.

  3. Scheduling Attempt (scheduleJob):

    • Job state is set to JobStateSearching.

    • The Provider Registry Service is queried for available providers.

    • Providers are filtered based on status (idle), GPU type, and GPU count.

    • If a suitable provider is found:

      • Billing validation is performed by calling the Billing Service.

      • If billing validation succeeds, a billing session is started.

      • A Task is created and dispatched to the provider via a NATS message on a provider-specific subject.

      • Job state is updated to JobStateDispatched in the InternalJobRepresentation.

    • If no suitable provider is found, or if billing validation/session start fails, or if NATS dispatch fails:

      • The job state is set to JobStatePending or JobStateFailed with an error message.

  4. Post-Scheduling:

    • The JobConsumer updates the job's final state (e.g., Dispatched, Pending, Failed) and other details (ProviderID, LastError, Attempts) in the JobStore.

    • The NATS message for the job is ACK'd (if successfully dispatched or terminally failed to prevent redelivery) or NAK'd (if it should be retried later, e.g., no provider found).

  5. Status Updates & Completion (Conceptual - full implementation may vary):

    • The Provider Daemon, after receiving and processing a task, would publish status updates (e.g., Running, Completed, Failed) to a NATS subject (e.g., jobs.status.{jobID}).

    • The Scheduler Orchestrator would subscribe to these status updates.

    • Upon receiving a Completed or Failed status, the Scheduler would update the job's state in its JobStore and potentially notify other services or the user (e.g., via API Gateway or another NATS message).

    • If a job is Completed, the Billing Service would be notified to finalize the billing session.

  6. Shutdown:

    • Receives a termination signal (SIGINT, SIGTERM).

    • Stops the JobConsumer (drains NATS subscription).

    • Deregisters from Consul.

    • Shuts down its HTTP server.

    • Drains and closes the NATS connection.

    • Closes the database connection pool (implicitly via dbPool.Close() deferred in main).


5. API Endpoints (Minimal)

  • GET /health: Health check endpoint for Consul. Checks database and NATS connectivity.


6. Future Considerations

  • Advanced Scheduling Strategies: Implement more sophisticated scheduling algorithms beyond simple first-fit/round-robin (e.g., least-busy, cost-based, priority-based, location-aware, GPU capability matching).

  • Job Status API: Expose an API for querying job statuses, potentially through the API Gateway.

  • Resilience and Retry Logic: Enhance retry mechanisms for job scheduling and task dispatching, including configurable backoff strategies.

  • Job Queue Management: Implement more advanced queue management features (e.g., prioritization, reordering).

  • Task Status Subscription: Fully implement subscription to NATS subjects for real-time status updates from provider daemons.

  • Scalability: Ensure the scheduler can handle a high volume of incoming jobs and provider updates.

  • Resource Reservation: Mechanisms to reserve provider resources for a job.

  • Integration with Storage Service: For handling job inputs/outputs if paths are provided in job parameters.

  • Job Cancellation: Implement logic to handle job cancellation requests.

Last updated