Back to Blog
6 min read

Sandboxing AI Agents: Isolation and Containment Strategies

Sandboxing AI agents limits potential damage from unexpected behavior. Let’s explore strategies for isolating agents while maintaining their usefulness.

Sandboxing Fundamentals

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Any, Optional
import subprocess
import tempfile
import os

@dataclass
class SandboxConfig:
    """Configuration for agent sandbox"""
    max_memory_mb: int = 512
    max_cpu_percent: float = 50.0
    max_execution_time_seconds: int = 60
    allowed_network: bool = False
    allowed_filesystem_paths: List[str] = None
    allowed_commands: List[str] = None
    environment_variables: Dict[str, str] = None

class Sandbox(ABC):
    """Abstract base for sandbox implementations"""

    def __init__(self, config: SandboxConfig):
        self.config = config

    @abstractmethod
    def execute(self, code: str) -> Dict[str, Any]:
        """Execute code in sandbox"""
        pass

    @abstractmethod
    def cleanup(self):
        """Clean up sandbox resources"""
        pass

class ProcessSandbox(Sandbox):
    """Sandbox using subprocess with restrictions"""

    def execute(self, code: str) -> Dict[str, Any]:
        """Execute Python code in isolated subprocess"""

        # Create temporary file for code
        with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
            f.write(code)
            code_file = f.name

        try:
            # Build command with resource limits
            cmd = self._build_command(code_file)

            # Execute with timeout
            result = subprocess.run(
                cmd,
                capture_output=True,
                timeout=self.config.max_execution_time_seconds,
                text=True,
                env=self._build_environment()
            )

            return {
                "success": result.returncode == 0,
                "stdout": result.stdout,
                "stderr": result.stderr,
                "return_code": result.returncode
            }

        except subprocess.TimeoutExpired:
            return {
                "success": False,
                "error": "Execution timeout exceeded",
                "timeout": True
            }

        finally:
            os.unlink(code_file)

    def _build_command(self, code_file: str) -> List[str]:
        """Build execution command with limits"""
        import sys

        # Basic Python execution
        cmd = [sys.executable, code_file]

        # On Linux, use cgroups for resource limits
        if os.name == 'posix':
            # Memory limit
            cmd = ['systemd-run', '--scope', '-p',
                   f'MemoryMax={self.config.max_memory_mb}M'] + cmd

        return cmd

    def _build_environment(self) -> Dict[str, str]:
        """Build restricted environment"""
        env = os.environ.copy()

        # Remove sensitive variables
        sensitive = ['AWS_SECRET', 'API_KEY', 'PASSWORD', 'TOKEN']
        for key in list(env.keys()):
            if any(s in key.upper() for s in sensitive):
                del env[key]

        # Add custom variables
        if self.config.environment_variables:
            env.update(self.config.environment_variables)

        return env

    def cleanup(self):
        pass

Docker-Based Sandboxing

import docker
from docker.types import Mount
import json

class DockerSandbox(Sandbox):
    """Sandbox using Docker containers"""

    def __init__(self, config: SandboxConfig, image: str = "python:3.11-slim"):
        super().__init__(config)
        self.image = image
        self.client = docker.from_env()
        self.container = None

    def execute(self, code: str) -> Dict[str, Any]:
        """Execute code in Docker container"""

        try:
            # Create container
            self.container = self.client.containers.run(
                self.image,
                command=["python", "-c", code],
                detach=True,
                mem_limit=f"{self.config.max_memory_mb}m",
                cpu_period=100000,
                cpu_quota=int(self.config.max_cpu_percent * 1000),
                network_mode="none" if not self.config.allowed_network else "bridge",
                read_only=True,
                security_opt=["no-new-privileges"],
                cap_drop=["ALL"],
                mounts=self._build_mounts()
            )

            # Wait for completion
            result = self.container.wait(
                timeout=self.config.max_execution_time_seconds
            )

            # Get output
            stdout = self.container.logs(stdout=True, stderr=False).decode()
            stderr = self.container.logs(stdout=False, stderr=True).decode()

            return {
                "success": result["StatusCode"] == 0,
                "stdout": stdout,
                "stderr": stderr,
                "return_code": result["StatusCode"]
            }

        except docker.errors.APIError as e:
            return {
                "success": False,
                "error": str(e)
            }

        finally:
            self.cleanup()

    def _build_mounts(self) -> List[Mount]:
        """Build allowed filesystem mounts"""
        mounts = []

        if self.config.allowed_filesystem_paths:
            for path in self.config.allowed_filesystem_paths:
                mounts.append(Mount(
                    target=path,
                    source=path,
                    type="bind",
                    read_only=True
                ))

        return mounts

    def cleanup(self):
        """Remove container"""
        if self.container:
            try:
                self.container.remove(force=True)
            except:
                pass
            self.container = None

Network Isolation

from dataclasses import dataclass
from typing import Set
import socket

@dataclass
class NetworkPolicy:
    """Network access policy"""
    allow_outbound: bool = False
    allowed_hosts: Set[str] = None
    allowed_ports: Set[int] = None
    blocked_hosts: Set[str] = None
    max_connections: int = 10

class NetworkSandbox:
    """Control network access for agents"""

    def __init__(self, policy: NetworkPolicy):
        self.policy = policy
        self.active_connections = 0

    def check_connection(self, host: str, port: int) -> bool:
        """Check if connection is allowed"""

        if not self.policy.allow_outbound:
            return False

        # Check against blocked hosts
        if self.policy.blocked_hosts:
            if host in self.policy.blocked_hosts:
                return False

            # Also check IP addresses
            try:
                ip = socket.gethostbyname(host)
                if ip in self.policy.blocked_hosts:
                    return False
            except:
                pass

        # Check allowed hosts
        if self.policy.allowed_hosts:
            if host not in self.policy.allowed_hosts:
                return False

        # Check allowed ports
        if self.policy.allowed_ports:
            if port not in self.policy.allowed_ports:
                return False

        # Check connection limit
        if self.active_connections >= self.policy.max_connections:
            return False

        return True

    def open_connection(self, host: str, port: int):
        """Track connection opening"""
        if self.check_connection(host, port):
            self.active_connections += 1
            return True
        return False

    def close_connection(self):
        """Track connection closing"""
        if self.active_connections > 0:
            self.active_connections -= 1

class ProxiedNetworkSandbox(NetworkSandbox):
    """Network sandbox with proxy for logging"""

    def __init__(self, policy: NetworkPolicy, proxy_port: int = 8080):
        super().__init__(policy)
        self.proxy_port = proxy_port
        self.request_log = []

    def log_request(self, method: str, url: str, response_status: int):
        """Log network request"""
        self.request_log.append({
            "timestamp": time.time(),
            "method": method,
            "url": url,
            "status": response_status
        })

    def get_request_summary(self) -> Dict:
        """Get summary of network requests"""
        return {
            "total_requests": len(self.request_log),
            "by_method": self._count_by_field("method"),
            "by_status": self._count_by_field("status"),
            "recent": self.request_log[-10:]
        }

    def _count_by_field(self, field: str) -> Dict:
        counts = {}
        for req in self.request_log:
            value = str(req.get(field))
            counts[value] = counts.get(value, 0) + 1
        return counts

Filesystem Sandboxing

import os
import shutil
from pathlib import Path

class FilesystemSandbox:
    """Sandbox filesystem access"""

    def __init__(self, base_path: str = None, max_size_mb: int = 100):
        self.base_path = Path(base_path or tempfile.mkdtemp())
        self.max_size_mb = max_size_mb
        self.total_written = 0

    def get_path(self, relative_path: str) -> Path:
        """Get sandboxed path"""
        # Prevent path traversal
        clean_path = Path(relative_path).resolve()
        full_path = (self.base_path / relative_path).resolve()

        if not str(full_path).startswith(str(self.base_path)):
            raise SecurityError(f"Path traversal attempt: {relative_path}")

        return full_path

    def read_file(self, path: str) -> str:
        """Read file from sandbox"""
        full_path = self.get_path(path)

        if not full_path.exists():
            raise FileNotFoundError(path)

        return full_path.read_text()

    def write_file(self, path: str, content: str) -> bool:
        """Write file to sandbox"""
        full_path = self.get_path(path)

        # Check size limit
        new_size = len(content.encode())
        if (self.total_written + new_size) / (1024 * 1024) > self.max_size_mb:
            raise ResourceExhaustedError("Filesystem quota exceeded")

        # Create parent directories
        full_path.parent.mkdir(parents=True, exist_ok=True)

        # Write file
        full_path.write_text(content)
        self.total_written += new_size

        return True

    def list_directory(self, path: str = ".") -> List[str]:
        """List directory contents"""
        full_path = self.get_path(path)

        if not full_path.is_dir():
            raise NotADirectoryError(path)

        return [p.name for p in full_path.iterdir()]

    def delete_file(self, path: str) -> bool:
        """Delete file from sandbox"""
        full_path = self.get_path(path)

        if not full_path.exists():
            return False

        if full_path.is_dir():
            shutil.rmtree(full_path)
        else:
            full_path.unlink()

        return True

    def cleanup(self):
        """Clean up sandbox directory"""
        if self.base_path.exists():
            shutil.rmtree(self.base_path)

class SecurityError(Exception):
    pass

class ResourceExhaustedError(Exception):
    pass

Integrated Sandbox Manager

class SandboxManager:
    """Manage complete agent sandboxing"""

    def __init__(self, config: SandboxConfig):
        self.config = config
        self.process_sandbox = ProcessSandbox(config)
        self.network_sandbox = NetworkSandbox(NetworkPolicy(
            allow_outbound=config.allowed_network
        ))
        self.filesystem_sandbox = FilesystemSandbox()

    def create_agent_environment(self, agent_id: str) -> Dict:
        """Create isolated environment for an agent"""

        # Create dedicated filesystem area
        agent_fs = FilesystemSandbox(
            base_path=f"/tmp/agent_{agent_id}",
            max_size_mb=self.config.max_memory_mb
        )

        return {
            "agent_id": agent_id,
            "filesystem": agent_fs,
            "working_directory": str(agent_fs.base_path)
        }

    def execute_in_sandbox(self, agent_id: str, code: str,
                          network_required: bool = False) -> Dict:
        """Execute code in sandboxed environment"""

        if network_required and not self.config.allowed_network:
            return {
                "success": False,
                "error": "Network access not allowed"
            }

        return self.process_sandbox.execute(code)

    def cleanup_agent(self, agent_id: str):
        """Clean up agent resources"""
        self.filesystem_sandbox.cleanup()
        self.process_sandbox.cleanup()

Sandboxing is your last line of defense. Even trusted agents can behave unexpectedly - proper isolation ensures that unexpected behavior doesn’t become a catastrophe.

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.