diff --git a/packages/@n8n/task-runner-python/src/constants.py b/packages/@n8n/task-runner-python/src/constants.py index 64c88e5f0d..49b4e1c70c 100644 --- a/packages/@n8n/task-runner-python/src/constants.py +++ b/packages/@n8n/task-runner-python/src/constants.py @@ -57,7 +57,7 @@ TASK_REJECTED_REASON_OFFER_EXPIRED = ( TASK_REJECTED_REASON_AT_CAPACITY = "No open task slots - runner already at capacity" # Security -BUILTINS_DENY_DEFAULT = "eval,exec,compile,open,input,breakpoint,__import__,getattr,object,type,vars,setattr,delattr,hasattr,dir,memoryview,__build_class__" +BUILTINS_DENY_DEFAULT = "eval,exec,compile,open,input,breakpoint,getattr,object,type,vars,setattr,delattr,hasattr,dir,memoryview,__build_class__" ALWAYS_BLOCKED_ATTRIBUTES = { "__subclasses__", "__globals__", @@ -112,4 +112,7 @@ ERROR_RELATIVE_IMPORT = "Relative imports are disallowed." ERROR_STDLIB_DISALLOWED = "Import of standard library module '{module}' is disallowed. Allowed stdlib modules: {allowed}" ERROR_EXTERNAL_DISALLOWED = "Import of external package '{module}' is disallowed. Allowed external packages: {allowed}" ERROR_DANGEROUS_ATTRIBUTE = "Access to attribute '{attr}' is disallowed, because it can be used to bypass security restrictions." +ERROR_DYNAMIC_IMPORT = ( + "Dynamic __import__() calls are not allowed for security reasons." +) ERROR_SECURITY_VIOLATIONS = "Security violations detected:\n{violations}" diff --git a/packages/@n8n/task-runner-python/src/task_analyzer.py b/packages/@n8n/task-runner-python/src/task_analyzer.py index 759570bbca..6ddc997305 100644 --- a/packages/@n8n/task-runner-python/src/task_analyzer.py +++ b/packages/@n8n/task-runner-python/src/task_analyzer.py @@ -11,6 +11,7 @@ from src.constants import ( ERROR_STDLIB_DISALLOWED, ERROR_EXTERNAL_DISALLOWED, ERROR_DANGEROUS_ATTRIBUTE, + ERROR_DYNAMIC_IMPORT, ERROR_SECURITY_VIOLATIONS, ALWAYS_BLOCKED_ATTRIBUTES, UNSAFE_ATTRIBUTES, @@ -56,7 +57,7 @@ class SecurityValidator(ast.NodeVisitor): self.generic_visit(node) def visit_Attribute(self, node: ast.Attribute) -> None: - """Detect access to unsafe attributes that could bypass security.""" + """Detect access to unsafe attributes that could bypass security restrictions.""" if node.attr in UNSAFE_ATTRIBUTES: # Block regardless of context @@ -72,6 +73,35 @@ class SecurityValidator(ast.NodeVisitor): self.generic_visit(node) + def visit_Call(self, node: ast.Call) -> None: + """Detect calls to __import__() that could bypass security restrictions.""" + + is_import_call = ( + # __import__() + (isinstance(node.func, ast.Name) and node.func.id == "__import__") + or + # builtins.__import__() or __builtins__.__import__() + ( + isinstance(node.func, ast.Attribute) + and node.func.attr == "__import__" + and isinstance(node.func.value, ast.Name) + and node.func.value.id in {"builtins", "__builtins__"} + ) + ) + + if is_import_call: + if ( + node.args + and isinstance(node.args[0], ast.Constant) + and isinstance(node.args[0].value, str) + ): + module_name = node.args[0].value + self._validate_import(module_name, node.lineno) + else: + self._add_violation(node.lineno, ERROR_DYNAMIC_IMPORT) + + self.generic_visit(node) + # ========== Validation ========== def _validate_import(self, module_path: str, lineno: int) -> None: