API Security

API Protection and Security for AI-Built Apps

Secure your APIs against common attacks with advanced protection strategies for vibe-coded applications.

Vibe Security Team
2/1/2024
18 min read
API Protection and Security for AI-Built Apps

API Protection and Security for AI-Built Applications

APIs serve as the backbone of modern applications, enabling communication between services, mobile apps, and third-party integrations. However, AI-generated API code often lacks essential security controls, creating vulnerabilities that can lead to data breaches, unauthorized access, and service abuse. This article provides comprehensive guidance on implementing robust security measures for APIs in AI-built applications.

The API Security Challenge in AI-Generated Code

AI code generation tools excel at creating functional API endpoints but frequently overlook critical security considerations. They may generate endpoints that work perfectly for basic functionality while leaving applications vulnerable to sophisticated attacks. Common issues include missing authentication, inadequate input validation, excessive data exposure, and lack of rate limiting.

The challenge is compounded by the rapid development pace enabled by AI tools, where security considerations may be deferred in favor of quickly implementing features. This article addresses these challenges by providing practical, implementable security measures that can be integrated into AI-generated API code.

Comprehensive API Authentication and Authorization

Proper authentication and authorization form the foundation of API security, yet AI-generated APIs often implement weak or incomplete access controls.

JWT-Based Authentication with Security Enhancements

Here's a secure JWT implementation that addresses common vulnerabilities in AI-generated authentication:

import jwt
import secrets
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import redis
import hashlib

class SecureJWTManager:
    def __init__(self, secret_key, redis_client):
        self.secret_key = secret_key
        self.redis_client = redis_client
        self.algorithm = 'HS256'
        self.access_token_expiry = timedelta(minutes=15)
        self.refresh_token_expiry = timedelta(days=7)
        self.blacklist_key_prefix = 'jwt_blacklist:'
    
    def generate_tokens(self, user_id, permissions=None):
        """Generate secure access and refresh tokens"""
        now = datetime.utcnow()
        jti = secrets.token_urlsafe(32)  # Unique token ID
        
        # Access token payload
        access_payload = {
            'user_id': user_id,
            'exp': now + self.access_token_expiry,
            'iat': now,
            'jti': jti,
            'type': 'access',
            'permissions': permissions or []
        }
        
        # Refresh token payload
        refresh_jti = secrets.token_urlsafe(32)
        refresh_payload = {
            'user_id': user_id,
            'exp': now + self.refresh_token_expiry,
            'iat': now,
            'jti': refresh_jti,
            'type': 'refresh'
        }
        
        access_token = jwt.encode(access_payload, self.secret_key, algorithm=self.algorithm)
        refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm=self.algorithm)
        
        # Store token metadata for tracking
        self._store_token_metadata(jti, user_id, 'access', now + self.access_token_expiry)
        self._store_token_metadata(refresh_jti, user_id, 'refresh', now + self.refresh_token_expiry)
        
        return {
            'access_token': access_token,
            'refresh_token': refresh_token,
            'expires_in': self.access_token_expiry.total_seconds()
        }
    
    def validate_token(self, token, required_permissions=None):
        """Validate JWT token with security checks"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            
            # Check if token is blacklisted
            if self._is_token_blacklisted(payload['jti']):
                raise jwt.InvalidTokenError("Token has been revoked")
            
            # Verify token type
            if payload.get('type') != 'access':
                raise jwt.InvalidTokenError("Invalid token type")
            
            # Check permissions if required
            if required_permissions:
                user_permissions = set(payload.get('permissions', []))
                required_perms = set(required_permissions)
                
                if not required_perms.issubset(user_permissions):
                    raise jwt.InvalidTokenError("Insufficient permissions")
            
            return payload
            
        except jwt.ExpiredSignatureError:
            raise jwt.InvalidTokenError("Token has expired")
        except jwt.InvalidTokenError:
            raise
    
    def revoke_token(self, token):
        """Revoke a specific token by blacklisting"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            jti = payload['jti']
            exp = payload['exp']
            
            # Add to blacklist until expiration
            expiry_time = datetime.fromtimestamp(exp) - datetime.utcnow()
            if expiry_time.total_seconds() > 0:
                self.redis_client.setex(
                    f"{self.blacklist_key_prefix}{jti}",
                    int(expiry_time.total_seconds()),
                    "revoked"
                )
            
            return True
        except jwt.InvalidTokenError:
            return False
    
    def _is_token_blacklisted(self, jti):
        """Check if token is blacklisted"""
        return self.redis_client.exists(f"{self.blacklist_key_prefix}{jti}")
    
    def _store_token_metadata(self, jti, user_id, token_type, expiry):
        """Store token metadata for tracking"""
        metadata = {
            'user_id': user_id,
            'type': token_type,
            'issued_at': datetime.utcnow().isoformat(),
            'expires_at': expiry.isoformat()
        }
        
        expiry_seconds = int((expiry - datetime.utcnow()).total_seconds())
        self.redis_client.setex(
            f"token_metadata:{jti}",
            expiry_seconds,
            json.dumps(metadata)
        )

Role-Based Access Control (RBAC)

from enum import Enum
from functools import wraps
from flask import request, jsonify, g

class Permission(Enum):
    READ_USER = "read_user"
    WRITE_USER = "write_user"
    DELETE_USER = "delete_user"
    READ_ADMIN = "read_admin"
    WRITE_ADMIN = "write_admin"
    SYSTEM_ADMIN = "system_admin"

class Role(Enum):
    USER = "user"
    MODERATOR = "moderator"
    ADMIN = "admin"
    SYSTEM = "system"

class RBACManager:
    def __init__(self):
        self.role_permissions = {
            Role.USER: [Permission.READ_USER, Permission.WRITE_USER],
            Role.MODERATOR: [
                Permission.READ_USER, Permission.WRITE_USER,
                Permission.DELETE_USER, Permission.READ_ADMIN
            ],
            Role.ADMIN: [
                Permission.READ_USER, Permission.WRITE_USER,
                Permission.DELETE_USER, Permission.READ_ADMIN,
                Permission.WRITE_ADMIN
            ],
            Role.SYSTEM: [perm for perm in Permission]
        }
    
    def get_permissions_for_role(self, role):
        """Get all permissions for a specific role"""
        return [perm.value for perm in self.role_permissions.get(role, [])]
    
    def require_permission(self, required_permissions):
        """Decorator for endpoint permission checking"""
        def decorator(f):
            @wraps(f)
            def decorated_function(*args, **kwargs):
                if not hasattr(g, 'user_permissions'):
                    return jsonify({'error': 'Authentication required'}), 401
                
                user_perms = set(g.user_permissions)
                required_perms = set(required_permissions)
                
                if not required_perms.issubset(user_perms):
                    return jsonify({
                        'error': 'Insufficient permissions',
                        'required': list(required_perms),
                        'available': list(user_perms)
                    }), 403
                
                return f(*args, **kwargs)
            return decorated_function
        return decorator

# Usage example
rbac = RBACManager()

@app.route('/api/users/<int:user_id>', methods=['DELETE'])
@rbac.require_permission([Permission.DELETE_USER.value])
def delete_user(user_id):
    # Implementation here
    pass

Advanced Input Validation and Sanitization

AI-generated APIs often lack comprehensive input validation, leading to injection attacks and data corruption.

Comprehensive Input Validation Framework

import re
from marshmallow import Schema, fields, validate, ValidationError
from marshmallow.decorators import validates_schema
import bleach
from urllib.parse import urlparse

class SecureInputValidator:
    def __init__(self):
        self.email_regex = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
        self.phone_regex = re.compile(r'^\+?1?[2-9]\d{2}[2-9]\d{2}\d{4}$')
        self.allowed_html_tags = ['b', 'i', 'u', 'em', 'strong', 'p', 'br']
        self.allowed_html_attributes = {}
    
    def sanitize_html(self, content):
        """Sanitize HTML content to prevent XSS"""
        return bleach.clean(
            content,
            tags=self.allowed_html_tags,
            attributes=self.allowed_html_attributes,
            strip=True
        )
    
    def validate_url(self, url):
        """Validate URL format and scheme"""
        try:
            parsed = urlparse(url)
            if parsed.scheme not in ['http', 'https']:
                raise ValidationError('URL must use HTTP or HTTPS protocol')
            if not parsed.netloc:
                raise ValidationError('URL must have a valid domain')
            return True
        except Exception:
            raise ValidationError('Invalid URL format')
    
    def validate_file_upload(self, file_data):
        """Validate file upload data"""
        allowed_extensions = {'.jpg', '.jpeg', '.png', '.gif', '.pdf', '.docx'}
        allowed_mime_types = {
            'image/jpeg', 'image/png', 'image/gif',
            'application/pdf', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
        }
        max_size = 10 * 1024 * 1024  # 10MB
        
        # Check file extension
        if file_data.get('filename'):
            ext = os.path.splitext(file_data['filename'])[1].lower()
            if ext not in allowed_extensions:
                raise ValidationError(f'File extension {ext} not allowed')
        
        # Check MIME type
        if file_data.get('content_type') not in allowed_mime_types:
            raise ValidationError('Invalid file type')
        
        # Check file size
        if file_data.get('size', 0) > max_size:
            raise ValidationError('File size exceeds maximum allowed size')
        
        return True

class UserRegistrationSchema(Schema):
    username = fields.Str(
        required=True,
        validate=[
            validate.Length(min=3, max=30),
            validate.Regexp(r'^[a-zA-Z0-9_]+$', error='Username can only contain letters, numbers, and underscores')
        ]
    )
    email = fields.Email(required=True)
    password = fields.Str(
        required=True,
        validate=validate.Length(min=12, max=128)
    )
    phone_number = fields.Str(
        required=False,
        validate=validate.Regexp(
            r'^\+?1?[2-9]\d{2}[2-9]\d{2}\d{4}$',
            error='Invalid phone number format'
        )
    )
    bio = fields.Str(
        required=False,
        validate=validate.Length(max=500)
    )
    
    @validates_schema
    def validate_password_strength(self, data, **kwargs):
        password = data.get('password')
        if password:
            # Check password complexity
            if not re.search(r'[A-Z]', password):
                raise ValidationError('Password must contain at least one uppercase letter')
            if not re.search(r'[a-z]', password):
                raise ValidationError('Password must contain at least one lowercase letter')
            if not re.search(r'\d', password):
                raise ValidationError('Password must contain at least one number')
            if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
                raise ValidationError('Password must contain at least one special character')
    
    def load_and_sanitize(self, json_data):
        """Load and sanitize input data"""
        validator = SecureInputValidator()
        
        try:
            validated_data = self.load(json_data)
            
            # Sanitize HTML content
            if 'bio' in validated_data:
                validated_data['bio'] = validator.sanitize_html(validated_data['bio'])
            
            return validated_data
        except ValidationError as e:
            raise ValidationError(f"Input validation failed: {e.messages}")

API Rate Limiting and DDoS Protection

Comprehensive rate limiting protects APIs from abuse and ensures fair resource allocation among users.

Advanced Rate Limiting Implementation

import time
from collections import defaultdict
from datetime import datetime, timedelta
import hashlib
import redis

class AdvancedRateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.rate_limit_configs = {
            'default': {'requests': 100, 'window': 3600},  # 100 requests per hour
            'auth': {'requests': 10, 'window': 900},       # 10 login attempts per 15 minutes
            'upload': {'requests': 5, 'window': 3600},     # 5 uploads per hour
            'premium': {'requests': 1000, 'window': 3600}  # Premium users get higher limits
        }
    
    def is_allowed(self, identifier, endpoint_type='default', user_tier='basic'):
        """Check if request is allowed based on rate limits"""
        # Adjust limits based on user tier
        config = self.rate_limit_configs.get(endpoint_type, self.rate_limit_configs['default'])
        
        if user_tier == 'premium':
            config = self.rate_limit_configs['premium']
        
        key = f"rate_limit:{endpoint_type}:{identifier}"
        current_time = int(time.time())
        window_start = current_time - config['window']
        
        # Use Redis sorted set to track requests in time window
        pipe = self.redis.pipeline()
        
        # Remove expired entries
        pipe.zremrangebyscore(key, 0, window_start)
        
        # Count current requests
        pipe.zcard(key)
        
        # Add current request
        pipe.zadd(key, {str(current_time): current_time})
        
        # Set expiration
        pipe.expire(key, config['window'])
        
        results = pipe.execute()
        current_count = results[1]
        
        if current_count >= config['requests']:
            # Calculate time until reset
            oldest_request = self.redis.zrange(key, 0, 0, withscores=True)
            if oldest_request:
                reset_time = oldest_request[0][1] + config['window']
                return {
                    'allowed': False,
                    'requests_remaining': 0,
                    'reset_time': reset_time,
                    'retry_after': int(reset_time - current_time)
                }
        
        return {
            'allowed': True,
            'requests_remaining': config['requests'] - current_count - 1,
            'reset_time': current_time + config['window']
        }
    
    def get_client_identifier(self, request):
        """Generate unique client identifier"""
        # Combine multiple factors for identification
        factors = [
            request.remote_addr,
            request.headers.get('User-Agent', ''),
            request.headers.get('X-Forwarded-For', '').split(',')[0].strip()
        ]
        
        # Add authenticated user ID if available
        if hasattr(g, 'current_user') and g.current_user:
            factors.append(str(g.current_user.id))
        
        identifier_string = '|'.join(factors)
        return hashlib.sha256(identifier_string.encode()).hexdigest()

def rate_limit(endpoint_type='default'):
    """Decorator for applying rate limits to API endpoints"""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            limiter = AdvancedRateLimiter(redis_client)
            client_id = limiter.get_client_identifier(request)
            
            # Determine user tier
            user_tier = 'basic'
            if hasattr(g, 'current_user') and g.current_user:
                user_tier = g.current_user.subscription_tier or 'basic'
            
            result = limiter.is_allowed(client_id, endpoint_type, user_tier)
            
            if not result['allowed']:
                response = jsonify({
                    'error': 'Rate limit exceeded',
                    'retry_after': result['retry_after']
                })
                response.status_code = 429
                response.headers['Retry-After'] = str(result['retry_after'])
                response.headers['X-RateLimit-Remaining'] = '0'
                response.headers['X-RateLimit-Reset'] = str(result['reset_time'])
                return response
            
            # Add rate limit headers to successful responses
            response = make_response(f(*args, **kwargs))
            response.headers['X-RateLimit-Remaining'] = str(result['requests_remaining'])
            response.headers['X-RateLimit-Reset'] = str(result['reset_time'])
            
            return response
        return decorated_function
    return decorator

API Response Security and Data Exposure Prevention

AI-generated APIs often return excessive data, potentially exposing sensitive information.

Secure Response Serialization

from marshmallow import Schema, fields, post_dump

class SecureResponseSerializer:
    def __init__(self):
        self.sensitive_fields = [
            'password', 'password_hash', 'secret_key', 'api_key',
            'ssn', 'credit_card', 'bank_account', 'internal_id'
        ]
    
    def serialize_user_response(self, user_data, current_user_id, is_admin=False):
        """Serialize user data based on permissions"""
        if isinstance(user_data, list):
            return [self._serialize_single_user(user, current_user_id, is_admin) 
                   for user in user_data]
        else:
            return self._serialize_single_user(user_data, current_user_id, is_admin)
    
    def _serialize_single_user(self, user, current_user_id, is_admin):
        """Serialize single user object"""
        # Base public fields
        public_fields = ['id', 'username', 'created_at', 'public_profile']
        
        # Add fields for own profile or admin
        if user.get('id') == current_user_id or is_admin:
            public_fields.extend(['email', 'phone', 'last_login'])
        
        # Admin-only fields
        if is_admin:
            public_fields.extend(['subscription_tier', 'account_status'])
        
        # Filter response
        filtered_response = {field: user.get(field) for field in public_fields if field in user}
        
        # Remove any remaining sensitive data
        for sensitive_field in self.sensitive_fields:
            filtered_response.pop(sensitive_field, None)
        
        return filtered_response

class UserPublicSchema(Schema):
    id = fields.Int()
    username = fields.Str()
    created_at = fields.DateTime()
    
    @post_dump
    def remove_sensitive_data(self, data, **kwargs):
        """Final cleanup of sensitive data"""
        sensitive_keys = ['password', 'email', 'phone']
        for key in sensitive_keys:
            data.pop(key, None)
        return data

class UserPrivateSchema(UserPublicSchema):
    email = fields.Email()
    phone = fields.Str()
    last_login = fields.DateTime()

API Security Implementation Checklist

  1. Authentication & Authorization

    • Implement JWT with secure practices
    • Use role-based access control (RBAC)
    • Implement token blacklisting and refresh mechanisms
  2. Input Validation

    • Validate all input data using schemas
    • Sanitize HTML content to prevent XSS
    • Implement file upload security checks
  3. Rate Limiting

    • Apply rate limiting to all endpoints
    • Implement progressive rate limiting
    • Use distributed rate limiting for scalability
  4. Response Security

    • Filter sensitive data from responses
    • Implement proper error handling
    • Use appropriate HTTP status codes
  5. Infrastructure Security

    • Use HTTPS for all API communications
    • Implement proper CORS policies
    • Add security headers to responses

Conclusion

Securing APIs in AI-built applications requires comprehensive attention to authentication, input validation, rate limiting, and response filtering. While AI tools can rapidly generate functional API code, implementing these security measures ensures that applications remain protected against sophisticated attacks. Regular security audits and updates are essential for maintaining robust API security in production environments.

API Security
Rate Limiting
Input Validation

Continue Learning

Explore more security insights and best practices in our learning center.

View All Articles