API Gateway - Siger

The API Gateway is the primary entry point for all external client interactions with the Dante GPU Platform. It routes requests to appropriate backend services, handles authentication and authorization, and can perform other cross-cutting concerns like rate limiting and request/response transformation.


1. Responsibilities

The API Gateway has the following key responsibilities:

  • Request Routing: Directs incoming HTTP requests to the correct microservices (e.g., Auth Service, Provider Registry, Scheduler Orchestrator, Billing Service, Storage Service). This is often based on path prefixes.

  • Authentication & Authorization: Verifies user identity (typically via JWT tokens) and ensures users have the necessary permissions to access requested resources or perform actions.

  • Service Discovery: Integrates with Consul to dynamically discover the addresses and ports of backend services.

  • Load Balancing: Distributes requests across multiple instances of a backend service. While a simple round-robin approach is mentioned, more sophisticated strategies could be implemented.

  • Job Submission: Provides endpoints for users to submit new compute jobs, which are then published to a NATS stream for the Scheduler Orchestrator to consume.

  • API Aggregation: Can combine results from multiple services into a single response for the client.

  • Rate Limiting: (Planned) To prevent abuse and ensure fair usage.

  • Request/Response Transformation: (Planned) Modifying requests or responses as needed.


2. Tech Stack

The API Gateway is built using:

  • Programming Language: Go

  • HTTP Routing: Chi router

  • Service Discovery: HashiCorp Consul

  • Messaging Queue (for job submission): NATS (JetStream)

  • Logging: Zap for structured logging

  • Authentication: JWT (JSON Web Tokens)

  • Configuration: YAML files


3. Core Components and Logic

3.1. Configuration (internal/config/config.go)

The API Gateway's behavior is determined by its configuration file (configs/config.yaml).

Key configuration sections include:

  • server: Port and host for the HTTP server.

  • consul: Address for the Consul agent and service registration details.

  • nats: NATS server address and subject for job submissions.

  • jwt: Secret key for JWT signing and verification, and token expiry duration.

  • services: Configuration for backend services, including their names in Consul and path prefixes for routing.

  • billing_service: Specific configuration for the billing service client.

// Example of service configuration within the main Config struct
type ServiceConfig struct {
	Name          string        `yaml:"name"`           // Name of the service in Consul
	PathPrefix    string        `yaml:"path_prefix"`    // Path prefix to route to this service
	Timeout       time.Duration `yaml:"timeout"`        // Request timeout for this service
	AuthRequired  bool          `yaml:"auth_required"`  // Whether requests to this service require authentication
	StripPrefix   bool          `yaml:"strip_prefix"`   // Whether to strip the PathPrefix before forwarding
	AllowedRoles  []string      `yaml:"allowed_roles"`  // Roles allowed to access this service if auth is required
	RateLimit     RateLimitConfig `yaml:"rate_limit"`   // Rate limiting configuration for this service
}

type Config struct {
	// ... other fields
	Services       map[string]ServiceConfig `yaml:"services"`
	// ... other fields
}

3.2. HTTP Server and Routing (cmd/main.go)

The main function initializes the server, logger, Consul client, NATS client, and sets up routing using chi.

// Snippet from cmd/main.go showing router setup
func main() {
	// ... (config loading, logger init, client inits) ...

	r := chi.NewRouter()

	// Middleware stack
	r.Use(middleware.RequestID)
	r.Use(middleware.RealIP)
	r.Use(custommiddleware.NewStructuredLogger(logger)) // Custom Zap logger
	r.Use(middleware.Recoverer)
	r.Use(middleware.Timeout(cfg.Server.Timeout)) // Global request timeout

	// Auth middleware
	authMiddleware := custommiddleware.NewAuthMiddleware(cfg.JWT.SecretKey, logger, consulClient, cfg.Services["auth"].Name) // Example for auth service

	// Register handlers
	authHandler := handlers.NewAuthHandler(consulClient, cfg.Services["auth"].Name, logger, cfg.JWT.SecretKey, time.Duration(cfg.JWT.ExpiryMinutes)*time.Minute)
	r.Mount("/auth", authHandler.Routes(authMiddleware)) // Mount auth routes, potentially without global authMiddleware if login/register are public

	jobHandler := handlers.NewJobHandler(natsClient, cfg.NATS.JobSubmissionSubject, logger)
	r.With(authMiddleware.Authorize).Post("/jobs/submit", jobHandler.SubmitJobHandler) // Example: Job submission requires authorization

	billingProxyHandler := handlers.NewBillingServiceProxy(consulClient, cfg.Services["billing"].Name, logger, cfg.BillingService)
    r.Group(func(router chi.Router) {
        router.Use(authMiddleware.Authorize) // Protect all billing routes
        router.Mount("/billing", billingProxyHandler.Routes())
    })

	// Generic Proxy for other services
	proxyHandler := handlers.NewProxy(consulClient, logger, cfg.Services)
	r.HandleFunc("/*", proxyHandler.ServeHTTP) // Catch-all for proxying

	// ... (server start and graceful shutdown) ...
}

Standard middleware for request ID, real IP, logging, panic recovery, and timeout are applied globally.

3.3. Authentication (internal/auth/jwt.go, internal/middleware/auth.go)

JWTs are used for stateless authentication.

  • JWT Generation (internal/auth/jwt.go): The GenerateToken function creates a new JWT signed with a secret key, containing user claims (like UserID, username, roles).

  • JWT Validation (internal/auth/jwt.go): The ValidateToken function parses and verifies a JWT string.

// Snippet from internal/auth/jwt.go - GenerateToken
// Claims struct to be encoded to a JWT
type Claims struct {
	UserID   string   `json:"user_id"`
	Username string   `json:"username"`
	Roles    []string `json:"roles"`
	jwt.RegisteredClaims
}

// GenerateToken creates a new JWT token
func GenerateToken(userID, username string, roles []string, secretKey string, expiry time.Duration) (string, error) {
	expirationTime := time.Now().Add(expiry)
	claims := &Claims{
		UserID:   userID,
		Username: username,
		Roles:    roles,
		RegisteredClaims: jwt.RegisteredClaims{
			ExpiresAt: jwt.NewNumericDate(expirationTime),
			IssuedAt:  jwt.NewNumericDate(time.Now()),
			Issuer:    "dante-api-gateway",
		},
	}

	token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
	tokenString, err := token.SignedString([]byte(secretKey))
	if err != nil {
		return "", err
	}
	return tokenString, nil
}
  • Auth Middleware (internal/middleware/auth.go): This middleware extracts the JWT from the Authorization header, validates it, and if valid, injects user information (UserID, roles) into the request context for downstream handlers.

// Snippet from internal/middleware/auth.go - Authorize method
func (am *AuthMiddleware) Authorize(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		authHeader := r.Header.Get("Authorization")
		if authHeader == "" {
			http.Error(w, "Authorization header required", http.StatusUnauthorized)
			return
		}

		parts := strings.Split(authHeader, " ")
		if len(parts) != 2 || strings.ToLower(parts[0]) != "bearer" {
			http.Error(w, "Invalid authorization header format", http.StatusUnauthorized)
			return
		}
		tokenString := parts[1]

		claims, err := auth.ValidateToken(tokenString, am.jwtSecretKey)
		if err != nil {
			am.logger.Warn("Token validation failed", zap.Error(err))
			http.Error(w, "Invalid or expired token", http.StatusUnauthorized)
			return
		}

		// Add claims to context
		ctx := context.WithValue(r.Context(), auth.ContextUserKey, &auth.AuthenticatedUser{
			ID:       claims.UserID,
			Username: claims.Username,
			Roles:    claims.Roles,
		})

		next.ServeHTTP(w, r.WithContext(ctx))
	})
}

3.4. Service Discovery and Load Balancing (internal/consul/client.go, internal/loadbalancer/balancer.go)

  • The ConsulClient discovers service instances from Consul.

  • DiscoverService method fetches healthy instances of a given service.

  • The RoundRobinBalancer selects a service instance using a round-robin strategy.

// Snippet from internal/consul/client.go - DiscoverService
func (cc *Client) DiscoverService(serviceName string) ([]*url.URL, error) {
	// PassingOnly = true returns only healthy services
	serviceEntries, _, err := cc.client.Health().Service(serviceName, "", true, nil)
	if err != nil {
		cc.logger.Error("Failed to discover service from Consul", zap.String("service", serviceName), zap.Error(err))
		return nil, err
	}

	if len(serviceEntries) == 0 {
		cc.logger.Warn("No healthy instances found for service", zap.String("service", serviceName))
		return nil, fmt.Errorf("no healthy instances found for service %s", serviceName)
	}

	var urls []*url.URL
	for _, entry := range serviceEntries {
		addr := entry.Service.Address
		port := entry.Service.Port
		if addr == "" { // If Consul agent returns empty address, use Node address
			addr = entry.Node.Address
		}
		serviceURL := &url.URL{
			Scheme: "http", // Assuming http, make configurable if needed
			Host:   fmt.Sprintf("%s:%d", addr, port),
		}
		urls = append(urls, serviceURL)
	}
	return urls, nil
}

3.5. Request Proxying (internal/handlers/proxy.go)

The ProxyHandler is responsible for forwarding requests to the discovered backend services. It modifies the request's URL and headers as needed before sending it to the target service.

// Snippet from internal/handlers/proxy.go - ServeHTTP method
func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	var targetService *config.ServiceConfig
	var matchedServiceName string

	for serviceName, serviceCfg := range p.serviceConfigs {
		if strings.HasPrefix(r.URL.Path, serviceCfg.PathPrefix) {
			targetService = &serviceCfg
			matchedServiceName = serviceName
			break
		}
	}
	// ... (error handling if no service matches) ...

	// Discover service instance
	serviceInstances, err := p.consulClient.DiscoverService(targetService.Name)
	// ... (error handling for discovery) ...

	// Select instance using load balancer
	targetURL, err := p.loadBalancer.SelectInstance(serviceInstances)
	// ... (error handling for balancer) ...

	// Create reverse proxy
	proxy := httputil.NewSingleHostReverseProxy(targetURL)

	// Modify request
	originalPath := r.URL.Path
	r.Host = targetURL.Host
	r.URL.Host = targetURL.Host
	r.URL.Scheme = targetURL.Scheme
	if targetService.StripPrefix {
		r.URL.Path = strings.TrimPrefix(originalPath, targetService.PathPrefix)
		if !strings.HasPrefix(r.URL.Path, "/") {
			r.URL.Path = "/" + r.URL.Path
		}
	}
	// ... (set X-Forwarded-For headers, etc.) ...

	proxy.ServeHTTP(w, r)
}

3.6. Job Submission (internal/handlers/job.go, internal/nats/client.go)

The JobHandler exposes an endpoint (e.g., /jobs/submit) for users to submit compute jobs.

  • It receives job details in the request body.

  • The NatsClient is used to publish the job data to a configured NATS subject.

  • It uses JetStream for ensuring message persistence and delivery.

// Snippet from internal/handlers/job.go - SubmitJobHandler
func (h *JobHandler) SubmitJobHandler(w http.ResponseWriter, r *http.Request) {
	// ... (decode request body into a JobRequest struct) ...

	// Extract user ID from context (set by auth middleware)
	authenticatedUser, ok := r.Context().Value(auth.ContextUserKey).(*auth.AuthenticatedUser)
	if !ok || authenticatedUser == nil {
		h.logger.Error("User not authenticated or user context not found")
		http.Error(w, "User authentication required", http.StatusUnauthorized)
		return
	}
	jobReq.UserID = authenticatedUser.ID // Populate UserID from authenticated user

	jobData, err := json.Marshal(jobReq)
	// ... (error handling for marshal) ...

	// Publish to NATS
	ack, err := h.natsClient.PublishJob(h.jobSubmissionSubject, jobData)
	// ... (error handling for NATS publish, check ack) ...

	w.WriteHeader(http.StatusAccepted)
	json.NewEncoder(w).Encode(map[string]string{"message": "Job submitted successfully", "job_id": jobReq.JobID})
}

// Snippet from internal/nats/client.go - PublishJob
func (nc *Client) PublishJob(subject string, data []byte) (*nats.PubAck, error) {
	if nc.js == nil {
		return nil, errors.New("JetStream not initialized")
	}
	// Publish using JetStream, which returns a PubAck
	ack, err := nc.js.Publish(subject, data, nats.MsgId(uuid.New().String())) // Add unique MsgId
	if err != nil {
		nc.logger.Error("Failed to publish job to NATS JetStream", zap.String("subject", subject), zap.Error(err))
		return nil, err
	}
	nc.logger.Info("Job published to NATS JetStream successfully",
		zap.String("subject", subject),
		zap.String("stream", ack.Stream),
		zap.Uint64("sequence", ack.Sequence),
	)
	return ack, nil
}

3.7. Billing Proxy (internal/handlers/billing.go, internal/billing/client.go)

The BillingServiceProxy handler forwards requests to the Billing Payment Service.

The BillingClient in internal/billing/client.go is used by other services (like Scheduler) to interact with the billing service, not directly by the proxy itself, but it shows the model interactions. The proxy handler uses the generic proxy mechanism.


4. API Endpoints

The API Gateway exposes several endpoints:

  • Auth Endpoints (e.g., /auth/):

    • POST /auth/register: User registration.

    • POST /auth/login: User login, returns JWT.

    • GET /auth/users/me: Get current user's details (requires auth).

    • POST /auth/validate: Validates a token (internal or admin use).

  • Job Submission Endpoint:

    • POST /jobs/submit: Submits a new compute job (requires auth).

  • Billing Service Endpoints (e.g., /billing/): Proxied to the Billing Payment Service (requires auth).

    • Examples: /billing/wallets/{userID}, /billing/sessions/start, /billing/usage-update.

  • Provider Registry Endpoints (e.g., /providers/): Proxied to the Provider Registry Service.

    • Examples: GET /providers, GET /providers/{providerID}.

  • Storage Service Endpoints (e.g., /storage/): Proxied to the Storage Service.

    • Examples: POST /storage/upload/{bucket_name}, GET /storage/download/{bucket_name}/{object_key}.

  • Health Check:

    • GET /health: Basic health check for the API Gateway itself.


5. Workflow

  1. Startup:

    • Loads configuration.

    • Initializes logger, Consul client, NATS client (with JetStream), and HTTP router (Chi).

    • Registers its own health check with Consul if enabled.

    • Sets up HTTP handlers for auth, job submission, and generic proxying.

    • Starts the HTTP server.

  2. Incoming Request:

    • A client sends an HTTP request to the API Gateway.

    • Middleware (logging, timeout, etc.) is applied.

  3. Authentication/Authorization (if applicable):

    • The AuthMiddleware checks for a JWT in the Authorization header.

    • If the token is valid, user information is added to the request context.

    • If the route or service requires specific roles, these are checked against the user's roles.

  4. Routing & Proxying:

    • If the request is for a specific handler like /auth/login or /jobs/submit, the dedicated handler processes it.

    • For other requests, the generic ProxyHandler determines the target backend service based on the request path and service configurations.

    • It discovers healthy instances of the target service from Consul.

    • Selects an instance using the load balancer.

    • The request is modified (URL, headers) and forwarded to the chosen backend instance.

    • The response from the backend service is streamed back to the client.

  5. Job Submission Workflow:

    • User calls POST /jobs/submit with job details.

    • After authentication, the JobHandler marshals the job request and publishes it to the configured NATS subject using natsClient.PublishJob.

    • The API Gateway responds to the user with an acceptance (e.g., 202 Accepted).

  6. Shutdown:

    • Receives SIGINT or SIGTERM.

    • Deregisters from Consul (if applicable).

    • Gracefully shuts down the NATS client and HTTP server.


6. Future Considerations

  • Rate Limiting: Implement service-level and global rate limiting.

  • Caching: Implement caching for frequently accessed, static data.

  • Request/Response Transformation: Add capabilities for more complex transformations if needed.

  • WebSockets: Support for WebSocket proxying if real-time bidirectional communication is required for some services.

  • Advanced Load Balancing: Implement more sophisticated load balancing strategies (e.g., least connections, weighted round-robin).

  • Circuit Breaking: Implement circuit breaker patterns for improved resilience when backend services are down or slow.

  • Distributed Tracing: Integrate with a distributed tracing system (e.g., Jaeger, Zipkin) for better observability across microservices.

  • API Versioning: Strategy for managing multiple versions of APIs.

Last updated