API Gateway - Siger
The API Gateway is the primary entry point for all external client interactions with the Dante GPU Platform. It routes requests to appropriate backend services, handles authentication and authorization, and can perform other cross-cutting concerns like rate limiting and request/response transformation.
1. Responsibilities
The API Gateway has the following key responsibilities:
Request Routing: Directs incoming HTTP requests to the correct microservices (e.g., Auth Service, Provider Registry, Scheduler Orchestrator, Billing Service, Storage Service). This is often based on path prefixes.
Authentication & Authorization: Verifies user identity (typically via JWT tokens) and ensures users have the necessary permissions to access requested resources or perform actions.
Service Discovery: Integrates with Consul to dynamically discover the addresses and ports of backend services.
Load Balancing: Distributes requests across multiple instances of a backend service. While a simple round-robin approach is mentioned, more sophisticated strategies could be implemented.
Job Submission: Provides endpoints for users to submit new compute jobs, which are then published to a NATS stream for the Scheduler Orchestrator to consume.
API Aggregation: Can combine results from multiple services into a single response for the client.
Rate Limiting: (Planned) To prevent abuse and ensure fair usage.
Request/Response Transformation: (Planned) Modifying requests or responses as needed.
2. Tech Stack
The API Gateway is built using:
Programming Language:
Go
HTTP Routing:
Chi router
Service Discovery:
HashiCorp Consul
Messaging Queue (for job submission): NATS (JetStream)
Logging: Zap for structured logging
Authentication: JWT (JSON Web Tokens)
Configuration: YAML files
3. Core Components and Logic
3.1. Configuration (internal/config/config.go
)
The API Gateway's behavior is determined by its configuration file (configs/config.yaml).
Key configuration sections include:
server
: Port and host for the HTTP server.consul
: Address for the Consul agent and service registration details.nats
: NATS server address and subject for job submissions.jwt
: Secret key for JWT signing and verification, and token expiry duration.services
: Configuration for backend services, including their names in Consul and path prefixes for routing.billing_service
: Specific configuration for the billing service client.
// Example of service configuration within the main Config struct
type ServiceConfig struct {
Name string `yaml:"name"` // Name of the service in Consul
PathPrefix string `yaml:"path_prefix"` // Path prefix to route to this service
Timeout time.Duration `yaml:"timeout"` // Request timeout for this service
AuthRequired bool `yaml:"auth_required"` // Whether requests to this service require authentication
StripPrefix bool `yaml:"strip_prefix"` // Whether to strip the PathPrefix before forwarding
AllowedRoles []string `yaml:"allowed_roles"` // Roles allowed to access this service if auth is required
RateLimit RateLimitConfig `yaml:"rate_limit"` // Rate limiting configuration for this service
}
type Config struct {
// ... other fields
Services map[string]ServiceConfig `yaml:"services"`
// ... other fields
}
3.2. HTTP Server and Routing (cmd/main.go
)
The main
function initializes the server, logger, Consul client, NATS client, and sets up routing using chi
.
// Snippet from cmd/main.go showing router setup
func main() {
// ... (config loading, logger init, client inits) ...
r := chi.NewRouter()
// Middleware stack
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(custommiddleware.NewStructuredLogger(logger)) // Custom Zap logger
r.Use(middleware.Recoverer)
r.Use(middleware.Timeout(cfg.Server.Timeout)) // Global request timeout
// Auth middleware
authMiddleware := custommiddleware.NewAuthMiddleware(cfg.JWT.SecretKey, logger, consulClient, cfg.Services["auth"].Name) // Example for auth service
// Register handlers
authHandler := handlers.NewAuthHandler(consulClient, cfg.Services["auth"].Name, logger, cfg.JWT.SecretKey, time.Duration(cfg.JWT.ExpiryMinutes)*time.Minute)
r.Mount("/auth", authHandler.Routes(authMiddleware)) // Mount auth routes, potentially without global authMiddleware if login/register are public
jobHandler := handlers.NewJobHandler(natsClient, cfg.NATS.JobSubmissionSubject, logger)
r.With(authMiddleware.Authorize).Post("/jobs/submit", jobHandler.SubmitJobHandler) // Example: Job submission requires authorization
billingProxyHandler := handlers.NewBillingServiceProxy(consulClient, cfg.Services["billing"].Name, logger, cfg.BillingService)
r.Group(func(router chi.Router) {
router.Use(authMiddleware.Authorize) // Protect all billing routes
router.Mount("/billing", billingProxyHandler.Routes())
})
// Generic Proxy for other services
proxyHandler := handlers.NewProxy(consulClient, logger, cfg.Services)
r.HandleFunc("/*", proxyHandler.ServeHTTP) // Catch-all for proxying
// ... (server start and graceful shutdown) ...
}
Standard middleware for request ID, real IP, logging, panic recovery, and timeout are applied globally.
3.3. Authentication (internal/auth/jwt.go
, internal/middleware/auth.go
)
JWTs are used for stateless authentication.
JWT Generation (
internal/auth/jwt.go
): TheGenerateToken
function creates a new JWT signed with a secret key, containing user claims (like UserID, username, roles).JWT Validation (
internal/auth/jwt.go
): TheValidateToken
function parses and verifies a JWT string.
// Snippet from internal/auth/jwt.go - GenerateToken
// Claims struct to be encoded to a JWT
type Claims struct {
UserID string `json:"user_id"`
Username string `json:"username"`
Roles []string `json:"roles"`
jwt.RegisteredClaims
}
// GenerateToken creates a new JWT token
func GenerateToken(userID, username string, roles []string, secretKey string, expiry time.Duration) (string, error) {
expirationTime := time.Now().Add(expiry)
claims := &Claims{
UserID: userID,
Username: username,
Roles: roles,
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(expirationTime),
IssuedAt: jwt.NewNumericDate(time.Now()),
Issuer: "dante-api-gateway",
},
}
token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
tokenString, err := token.SignedString([]byte(secretKey))
if err != nil {
return "", err
}
return tokenString, nil
}
Auth Middleware (
internal/middleware/auth.go
): This middleware extracts the JWT from theAuthorization
header, validates it, and if valid, injects user information (UserID, roles) into the request context for downstream handlers.
// Snippet from internal/middleware/auth.go - Authorize method
func (am *AuthMiddleware) Authorize(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
http.Error(w, "Authorization header required", http.StatusUnauthorized)
return
}
parts := strings.Split(authHeader, " ")
if len(parts) != 2 || strings.ToLower(parts[0]) != "bearer" {
http.Error(w, "Invalid authorization header format", http.StatusUnauthorized)
return
}
tokenString := parts[1]
claims, err := auth.ValidateToken(tokenString, am.jwtSecretKey)
if err != nil {
am.logger.Warn("Token validation failed", zap.Error(err))
http.Error(w, "Invalid or expired token", http.StatusUnauthorized)
return
}
// Add claims to context
ctx := context.WithValue(r.Context(), auth.ContextUserKey, &auth.AuthenticatedUser{
ID: claims.UserID,
Username: claims.Username,
Roles: claims.Roles,
})
next.ServeHTTP(w, r.WithContext(ctx))
})
}
3.4. Service Discovery and Load Balancing (internal/consul/client.go
, internal/loadbalancer/balancer.go
)
The
ConsulClient
discovers service instances from Consul.DiscoverService
method fetches healthy instances of a given service.The
RoundRobinBalancer
selects a service instance using a round-robin strategy.
// Snippet from internal/consul/client.go - DiscoverService
func (cc *Client) DiscoverService(serviceName string) ([]*url.URL, error) {
// PassingOnly = true returns only healthy services
serviceEntries, _, err := cc.client.Health().Service(serviceName, "", true, nil)
if err != nil {
cc.logger.Error("Failed to discover service from Consul", zap.String("service", serviceName), zap.Error(err))
return nil, err
}
if len(serviceEntries) == 0 {
cc.logger.Warn("No healthy instances found for service", zap.String("service", serviceName))
return nil, fmt.Errorf("no healthy instances found for service %s", serviceName)
}
var urls []*url.URL
for _, entry := range serviceEntries {
addr := entry.Service.Address
port := entry.Service.Port
if addr == "" { // If Consul agent returns empty address, use Node address
addr = entry.Node.Address
}
serviceURL := &url.URL{
Scheme: "http", // Assuming http, make configurable if needed
Host: fmt.Sprintf("%s:%d", addr, port),
}
urls = append(urls, serviceURL)
}
return urls, nil
}
3.5. Request Proxying (internal/handlers/proxy.go
)
The ProxyHandler
is responsible for forwarding requests to the discovered backend services. It modifies the request's URL and headers as needed before sending it to the target service.
// Snippet from internal/handlers/proxy.go - ServeHTTP method
func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var targetService *config.ServiceConfig
var matchedServiceName string
for serviceName, serviceCfg := range p.serviceConfigs {
if strings.HasPrefix(r.URL.Path, serviceCfg.PathPrefix) {
targetService = &serviceCfg
matchedServiceName = serviceName
break
}
}
// ... (error handling if no service matches) ...
// Discover service instance
serviceInstances, err := p.consulClient.DiscoverService(targetService.Name)
// ... (error handling for discovery) ...
// Select instance using load balancer
targetURL, err := p.loadBalancer.SelectInstance(serviceInstances)
// ... (error handling for balancer) ...
// Create reverse proxy
proxy := httputil.NewSingleHostReverseProxy(targetURL)
// Modify request
originalPath := r.URL.Path
r.Host = targetURL.Host
r.URL.Host = targetURL.Host
r.URL.Scheme = targetURL.Scheme
if targetService.StripPrefix {
r.URL.Path = strings.TrimPrefix(originalPath, targetService.PathPrefix)
if !strings.HasPrefix(r.URL.Path, "/") {
r.URL.Path = "/" + r.URL.Path
}
}
// ... (set X-Forwarded-For headers, etc.) ...
proxy.ServeHTTP(w, r)
}
3.6. Job Submission (internal/handlers/job.go
, internal/nats/client.go
)
The JobHandler
exposes an endpoint (e.g., /jobs/submit
) for users to submit compute jobs.
It receives job details in the request body.
The
NatsClient
is used to publish the job data to a configured NATS subject.It uses JetStream for ensuring message persistence and delivery.
// Snippet from internal/handlers/job.go - SubmitJobHandler
func (h *JobHandler) SubmitJobHandler(w http.ResponseWriter, r *http.Request) {
// ... (decode request body into a JobRequest struct) ...
// Extract user ID from context (set by auth middleware)
authenticatedUser, ok := r.Context().Value(auth.ContextUserKey).(*auth.AuthenticatedUser)
if !ok || authenticatedUser == nil {
h.logger.Error("User not authenticated or user context not found")
http.Error(w, "User authentication required", http.StatusUnauthorized)
return
}
jobReq.UserID = authenticatedUser.ID // Populate UserID from authenticated user
jobData, err := json.Marshal(jobReq)
// ... (error handling for marshal) ...
// Publish to NATS
ack, err := h.natsClient.PublishJob(h.jobSubmissionSubject, jobData)
// ... (error handling for NATS publish, check ack) ...
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(map[string]string{"message": "Job submitted successfully", "job_id": jobReq.JobID})
}
// Snippet from internal/nats/client.go - PublishJob
func (nc *Client) PublishJob(subject string, data []byte) (*nats.PubAck, error) {
if nc.js == nil {
return nil, errors.New("JetStream not initialized")
}
// Publish using JetStream, which returns a PubAck
ack, err := nc.js.Publish(subject, data, nats.MsgId(uuid.New().String())) // Add unique MsgId
if err != nil {
nc.logger.Error("Failed to publish job to NATS JetStream", zap.String("subject", subject), zap.Error(err))
return nil, err
}
nc.logger.Info("Job published to NATS JetStream successfully",
zap.String("subject", subject),
zap.String("stream", ack.Stream),
zap.Uint64("sequence", ack.Sequence),
)
return ack, nil
}
3.7. Billing Proxy (internal/handlers/billing.go
, internal/billing/client.go
)
The BillingServiceProxy handler forwards requests to the Billing Payment Service.
The BillingClient in internal/billing/client.go is used by other services (like Scheduler) to interact with the billing service, not directly by the proxy itself, but it shows the model interactions. The proxy handler uses the generic proxy mechanism.
4. API Endpoints
The API Gateway exposes several endpoints:
Auth Endpoints (e.g.,
/auth/
):POST /auth/register
: User registration.POST /auth/login
: User login, returns JWT.GET /auth/users/me
: Get current user's details (requires auth).POST /auth/validate
: Validates a token (internal or admin use).
Job Submission Endpoint:
POST /jobs/submit
: Submits a new compute job (requires auth).
Billing Service Endpoints (e.g.,
/billing/
): Proxied to the Billing Payment Service (requires auth).Examples:
/billing/wallets/{userID}
,/billing/sessions/start
,/billing/usage-update
.
Provider Registry Endpoints (e.g.,
/providers/
): Proxied to the Provider Registry Service.Examples:
GET /providers
,GET /providers/{providerID}
.
Storage Service Endpoints (e.g.,
/storage/
): Proxied to the Storage Service.Examples:
POST /storage/upload/{bucket_name}
,GET /storage/download/{bucket_name}/{object_key}
.
Health Check:
GET /health
: Basic health check for the API Gateway itself.
5. Workflow
Startup:
Loads configuration.
Initializes logger, Consul client, NATS client (with JetStream), and HTTP router (Chi).
Registers its own health check with Consul if enabled.
Sets up HTTP handlers for auth, job submission, and generic proxying.
Starts the HTTP server.
Incoming Request:
A client sends an HTTP request to the API Gateway.
Middleware (logging, timeout, etc.) is applied.
Authentication/Authorization (if applicable):
The
AuthMiddleware
checks for a JWT in theAuthorization
header.If the token is valid, user information is added to the request context.
If the route or service requires specific roles, these are checked against the user's roles.
Routing & Proxying:
If the request is for a specific handler like
/auth/login
or/jobs/submit
, the dedicated handler processes it.For other requests, the generic
ProxyHandler
determines the target backend service based on the request path and service configurations.It discovers healthy instances of the target service from Consul.
Selects an instance using the load balancer.
The request is modified (URL, headers) and forwarded to the chosen backend instance.
The response from the backend service is streamed back to the client.
Job Submission Workflow:
User calls
POST /jobs/submit
with job details.After authentication, the
JobHandler
marshals the job request and publishes it to the configured NATS subject usingnatsClient.PublishJob
.The API Gateway responds to the user with an acceptance (e.g.,
202 Accepted
).
Shutdown:
Receives SIGINT or SIGTERM.
Deregisters from Consul (if applicable).
Gracefully shuts down the NATS client and HTTP server.
6. Future Considerations
Rate Limiting: Implement service-level and global rate limiting.
Caching: Implement caching for frequently accessed, static data.
Request/Response Transformation: Add capabilities for more complex transformations if needed.
WebSockets: Support for WebSocket proxying if real-time bidirectional communication is required for some services.
Advanced Load Balancing: Implement more sophisticated load balancing strategies (e.g., least connections, weighted round-robin).
Circuit Breaking: Implement circuit breaker patterns for improved resilience when backend services are down or slow.
Distributed Tracing: Integrate with a distributed tracing system (e.g., Jaeger, Zipkin) for better observability across microservices.
API Versioning: Strategy for managing multiple versions of APIs.
Last updated