mirror of
https://github.com/Abdulazizzn/n8n-enterprise-unlocked.git
synced 2025-12-17 01:56:46 +00:00
refactor(core): Replace pickle with JSON serialization in native Python runner (#19123)
This commit is contained in:
@@ -6,7 +6,6 @@ import json
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
import pickle
|
|
||||||
|
|
||||||
from src.errors import (
|
from src.errors import (
|
||||||
TaskResultMissingError,
|
TaskResultMissingError,
|
||||||
@@ -27,7 +26,7 @@ from typing import Any, Set
|
|||||||
from multiprocessing.context import SpawnProcess
|
from multiprocessing.context import SpawnProcess
|
||||||
|
|
||||||
MULTIPROCESSING_CONTEXT = multiprocessing.get_context("spawn")
|
MULTIPROCESSING_CONTEXT = multiprocessing.get_context("spawn")
|
||||||
MAX_PRINT_STATEMENTS_ALLOWED = 100
|
MAX_PRINT_ARGS_ALLOWED = 100
|
||||||
|
|
||||||
PrintArgs = list[list[Any]] # Args to all `print()` calls in a Python code task
|
PrintArgs = list[list[Any]] # Args to all `print()` calls in a Python code task
|
||||||
|
|
||||||
@@ -105,8 +104,8 @@ class TaskExecutor:
|
|||||||
|
|
||||||
result_file = returned["result_file"]
|
result_file = returned["result_file"]
|
||||||
try:
|
try:
|
||||||
with open(result_file, "rb") as f:
|
with open(result_file, "r", encoding="utf-8") as f:
|
||||||
result = pickle.load(f)
|
result = json.load(f)
|
||||||
finally:
|
finally:
|
||||||
os.unlink(result_file)
|
os.unlink(result_file)
|
||||||
|
|
||||||
@@ -227,9 +226,13 @@ class TaskExecutor:
|
|||||||
queue: multiprocessing.Queue, result: list[Any], print_args: PrintArgs
|
queue: multiprocessing.Queue, result: list[Any], print_args: PrintArgs
|
||||||
):
|
):
|
||||||
with tempfile.NamedTemporaryFile(
|
with tempfile.NamedTemporaryFile(
|
||||||
mode="wb", delete=False, prefix="n8n_result_"
|
mode="w",
|
||||||
|
delete=False,
|
||||||
|
prefix="n8n_result_",
|
||||||
|
suffix=".json",
|
||||||
|
encoding="utf-8",
|
||||||
) as f:
|
) as f:
|
||||||
pickle.dump(result, f)
|
json.dump(result, f, default=str, ensure_ascii=False)
|
||||||
result_file = f.name
|
result_file = f.name
|
||||||
|
|
||||||
print_args_to_send = TaskExecutor._truncate_print_args(print_args)
|
print_args_to_send = TaskExecutor._truncate_print_args(print_args)
|
||||||
@@ -302,13 +305,13 @@ class TaskExecutor:
|
|||||||
def _truncate_print_args(print_args: PrintArgs) -> PrintArgs:
|
def _truncate_print_args(print_args: PrintArgs) -> PrintArgs:
|
||||||
"""Truncate print_args to prevent pipe buffer overflow."""
|
"""Truncate print_args to prevent pipe buffer overflow."""
|
||||||
|
|
||||||
if not print_args or len(print_args) <= MAX_PRINT_STATEMENTS_ALLOWED:
|
if not print_args or len(print_args) <= MAX_PRINT_ARGS_ALLOWED:
|
||||||
return print_args
|
return print_args
|
||||||
|
|
||||||
truncated = print_args[:MAX_PRINT_STATEMENTS_ALLOWED]
|
truncated = print_args[:MAX_PRINT_ARGS_ALLOWED]
|
||||||
truncated.append(
|
truncated.append(
|
||||||
[
|
[
|
||||||
f"[Output truncated - {len(print_args) - MAX_PRINT_STATEMENTS_ALLOWED} more print statements]"
|
f"[Output truncated - {len(print_args) - MAX_PRINT_ARGS_ALLOWED} more print statements]"
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user