diff --git a/README.md b/README.md
index 36c2830a..d0c29422 100644
--- a/README.md
+++ b/README.md
@@ -20,7 +20,19 @@
```bash
git clone https://github.com/tryanything-ai/anything.git
-pnpm dev
+pnpm i
+```
+
+### Start Backend
+
+```
+./start-dev.sh
+```
+
+### Start Frontend
+
+```
+pnpm dev --filter=web
```
## Systems
diff --git a/apps/web/src/components/studio/forms/testing/testing-tab.tsx b/apps/web/src/components/studio/forms/testing/testing-tab.tsx
index afcdb179..ef2fc434 100644
--- a/apps/web/src/components/studio/forms/testing/testing-tab.tsx
+++ b/apps/web/src/components/studio/forms/testing/testing-tab.tsx
@@ -1,9 +1,8 @@
-import { useEffect, useState } from "react";
+import { useEffect } from "react";
import { Button } from "@repo/ui/components/ui/button";
import { useAnything } from "@/context/AnythingContext";
import { Play, Loader2 } from "lucide-react";
import { TaskResult } from "./task-card";
-import { formatDuration, intervalToDuration } from "date-fns";
export default function TestingTab(): JSX.Element {
const {
@@ -18,24 +17,6 @@ export default function TestingTab(): JSX.Element {
workflow: { getActionIcon, setShowExplorer },
} = useAnything();
- // Local state to control minimum testing duration
- const [isTransitioning, setIsTransitioning] = useState(false);
- const [showTestingState, setShowTestingState] = useState(false);
-
- useEffect(() => {
- if (testingWorkflow) {
- setShowTestingState(true);
- setIsTransitioning(true);
- } else if (isTransitioning) {
- // When testing finishes, wait for minimum duration before hiding the testing state
- const timer = setTimeout(() => {
- setIsTransitioning(false);
- setShowTestingState(false);
- }, 800); // Minimum duration of 800ms for the testing state
- return () => clearTimeout(timer);
- }
- }, [testingWorkflow, isTransitioning]);
-
const runWorkflow = async () => {
try {
setShowExplorer(false);
@@ -50,7 +31,7 @@ export default function TestingTab(): JSX.Element {
// Clear any data or state related to the testing workflow when the component unmounts
resetState();
};
- }, []);
+ }, [resetState]);
return (
@@ -61,11 +42,11 @@ export default function TestingTab(): JSX.Element {
className="hover:bg-green-500 transition-all duration-300 min-w-[140px]"
disabled={testingWorkflow}
>
-
- {showTestingState ? (
+
+ {testingWorkflow ? (
<>
- Testing...
-
+ Testing...
+
>
) : (
<>
@@ -75,58 +56,27 @@ export default function TestingTab(): JSX.Element {
)}
-
- {testStartedTime && (
-
- {testFinishedTime && !isTransitioning
- ? "Complete"
- : "Running..."}
-
- )}
- {/* {testStartedTime && (
-
- {testFinishedTime && !isTransitioning
- ? formatDuration(
- intervalToDuration({
- start: new Date(testStartedTime),
- end: new Date(testFinishedTime),
- }),
- )
- : "Running..."}
-
- )} */}
-
+ {testStartedTime && (
+
+ {testFinishedTime ? "Complete" : "Running..."}
+
+ )}
- {(testingWorkflow || isTransitioning) &&
- worklowTestingSessionTasks.length === 0 && (
-
-
- Connecting to workflow session...
-
- )}
-
+ {testingWorkflow && worklowTestingSessionTasks.length === 0 && (
+
+
+ Connecting to workflow session...
+
+ )}
+
{worklowTestingSessionTasks.map((task, index) => (
-
-
-
+
))}
diff --git a/apps/web/src/components/tasks/task-table.tsx b/apps/web/src/components/tasks/task-table.tsx
index 2c62c286..07c7d121 100644
--- a/apps/web/src/components/tasks/task-table.tsx
+++ b/apps/web/src/components/tasks/task-table.tsx
@@ -180,13 +180,13 @@ export function TaskTable({
onClick={() => toggleExpand(task.task_id)}
>
- {task.result ? (
- expandedTaskIds.has(task.task_id) ? (
+ {/* {task.result ? ( */}
+ {expandedTaskIds.has(task.task_id) ? (
) : (
- )
- ) : null}
+ )}
+ {/* ) : null} */}
Result<(), Box> {
+ // Generate gRPC client code for JavaScript executor
+ tonic_build::configure()
+ .build_client(true)
+ .build_server(false)
+ .compile(
+ &["../js-server/proto/js_executor.proto"],
+ &["../js-server/proto"],
+ )?;
+ Ok(())
+}
diff --git a/core/anything-server/src/account_auth_middleware.rs b/core/anything-server/src/account_auth_middleware.rs
index 63825a3c..04f9a0a6 100644
--- a/core/anything-server/src/account_auth_middleware.rs
+++ b/core/anything-server/src/account_auth_middleware.rs
@@ -78,7 +78,7 @@ impl AccountAccessCache {
}
}
-async fn verify_account_access(
+pub async fn verify_account_access(
client: &postgrest::Postgrest,
jwt: &str,
user_id: &str,
@@ -106,9 +106,10 @@ pub async fn account_access_middleware(
next: Next,
) -> Result {
// Extract user_id from the existing auth middleware
- let user = request.extensions().get::().ok_or_else(|| {
- StatusCode::UNAUTHORIZED
- })?;
+ let user = request
+ .extensions()
+ .get::()
+ .ok_or_else(|| StatusCode::UNAUTHORIZED)?;
let user_id = &user.account_id;
// Extract account_id from path parameters
diff --git a/core/anything-server/src/actor_processor/workflow_actor.rs b/core/anything-server/src/actor_processor/workflow_actor.rs
index 86d0ea44..10a2885a 100644
--- a/core/anything-server/src/actor_processor/workflow_actor.rs
+++ b/core/anything-server/src/actor_processor/workflow_actor.rs
@@ -3,13 +3,16 @@ use crate::actor_processor::dependency_resolver::DependencyGraph;
use crate::actor_processor::messages::ActorMessage;
use crate::metrics::METRICS;
use crate::processor::components::{EnhancedSpanFactory, ProcessorError, WorkflowExecutionContext};
-use crate::processor::execute_task::TaskResult;
+
use crate::processor::processor::ProcessorMessage;
-use crate::types::task_types::Task;
+use crate::status_updater::{Operation, StatusUpdateMessage};
+use crate::types::task_types::{FlowSessionStatus, Task, TaskStatus, TriggerSessionStatus};
use crate::AppState;
+use chrono::Utc;
use opentelemetry::KeyValue;
use postgrest::Postgrest;
+use serde_json::{self, Value};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;
@@ -21,6 +24,7 @@ use uuid::Uuid;
pub struct WorkflowActor {
id: Uuid,
state: Arc,
+ #[allow(dead_code)]
client: Postgrest,
task_actor_pool: TaskActorPool,
span_factory: EnhancedSpanFactory,
@@ -46,7 +50,7 @@ impl WorkflowActor {
}
}
- pub async fn run(mut self, mut receiver: mpsc::Receiver) {
+ pub async fn run(self, mut receiver: mpsc::Receiver) {
info!("[WORKFLOW_ACTOR_{}] Starting workflow actor", self.id);
while let Some(message) = receiver.recv().await {
@@ -77,11 +81,11 @@ impl WorkflowActor {
);
}
- #[instrument(skip(self, message), fields(
- actor_id = %self.id,
- flow_session_id = %message.flow_session_id,
- workflow_id = %message.workflow_id
- ))]
+ // #[instrument(skip(self, message), fields(
+ // actor_id = %self.id,
+ // flow_session_id = %message.flow_session_id,
+ // workflow_id = %message.workflow_id
+ // ))]
async fn handle_execute_workflow(
&self,
message: ProcessorMessage,
@@ -167,24 +171,71 @@ impl WorkflowActor {
// Track currently running tasks
let running_tasks = Arc::new(RwLock::new(HashSet::::new()));
+ // Track failed filter tasks that should stop dependent actions
+ let failed_filters = Arc::new(RwLock::new(HashSet::::new()));
+
// Process tasks in dependency order
loop {
// Get ready actions that can be executed now
let ready_actions = {
let completed = completed_tasks.read().await;
let running = running_tasks.read().await;
- dependency_graph.get_ready_actions(actions, &completed, &running)
+ let failed = failed_filters.read().await;
+
+ let mut candidate_actions = dependency_graph.get_ready_actions(actions, &completed, &running);
+
+ // Filter out actions that depend on failed filters
+ candidate_actions.retain(|action| {
+ // Check if this action depends on any failed filters
+ let depends_on_failed_filter = dependency_graph.dependencies
+ .get(&action.action_id)
+ .map(|deps| {
+ deps.iter().any(|dep_action_id| failed.contains(dep_action_id))
+ })
+ .unwrap_or(false);
+
+ if depends_on_failed_filter {
+ info!(
+ "[WORKFLOW_ACTOR_{}] Skipping action {} because it depends on a failed filter",
+ self.id, action.action_id
+ );
+ false
+ } else {
+ true
+ }
+ });
+
+ candidate_actions
};
if ready_actions.is_empty() {
- // Check if all tasks are completed
+ // Check if all runnable tasks are completed
let completed = completed_tasks.read().await;
+ let failed = failed_filters.read().await;
let total_completed = completed.len();
-
- if total_completed == actions.len() {
+
+ // Count actions that are blocked by failed filters (will never run)
+ let blocked_actions = actions.iter().filter(|action| {
+ // Skip if already completed
+ if completed.values().any(|task| task.action_id == action.action_id) {
+ return false;
+ }
+
+ // Check if this action depends on any failed filters
+ dependency_graph.dependencies
+ .get(&action.action_id)
+ .map(|deps| {
+ deps.iter().any(|dep_action_id| failed.contains(dep_action_id))
+ })
+ .unwrap_or(false)
+ }).count();
+
+ let total_runnable = actions.len() - blocked_actions;
+
+ if total_completed == total_runnable {
info!(
- "[WORKFLOW_ACTOR_{}] All {} tasks completed successfully",
- self.id, total_completed
+ "[WORKFLOW_ACTOR_{}] All {} runnable tasks completed successfully ({} blocked by failed filters)",
+ self.id, total_completed, blocked_actions
);
break;
} else {
@@ -236,8 +287,28 @@ impl WorkflowActor {
.convert_action_to_task(&action, &message, 0) // processing_order not used in dependency-based execution
.await?;
- // ๐ TASK CREATION - Would normally create task in database
- info!("๐ TASK CREATION: Creating task {} for action {} (skipping database creation for debugging)", task.task_id, action.action_id);
+ // Send task creation message to database
+ let create_task_message = StatusUpdateMessage {
+ operation: Operation::CreateTask {
+ task_id: task.task_id,
+ account_id: message.workflow_version.account_id,
+ flow_session_id: context.flow_session_id,
+ input: task.clone(),
+ },
+ };
+
+ if let Err(e) = self
+ .state
+ .task_updater_sender
+ .send(create_task_message)
+ .await
+ {
+ error!(
+ "[WORKFLOW_ACTOR_{}] Failed to send create task message for {}: {}",
+ self.id, task.task_id, e
+ );
+ return Err(format!("Failed to send task creation message: {}", e).into());
+ }
info!(
"[WORKFLOW_ACTOR_{}] Created and executing task {} for action {}",
@@ -252,6 +323,29 @@ impl WorkflowActor {
context.span.clone(),
);
+ // Capture data needed for task completion handling
+ let action_data = (
+ action.label.clone(),
+ action.r#type.clone(),
+ action.plugin_name.clone(),
+ action.plugin_version.clone(),
+ action.inputs.clone().unwrap_or_default(),
+ action.inputs_schema.clone(),
+ action.plugin_config.clone(),
+ action.plugin_config_schema.clone(),
+ );
+ let message_data = (
+ message.workflow_version.account_id,
+ message.workflow_version.flow_version_id,
+ message
+ .trigger_task
+ .as_ref()
+ .map(|t| t.task_id.to_string())
+ .unwrap_or_default(),
+ message.trigger_session_id,
+ message.workflow_version.published,
+ );
+
// Execute task using actor pool with in-memory tasks for bundling
let completed_tasks_clone = Arc::clone(&completed_tasks);
let running_tasks_clone = Arc::clone(&running_tasks);
@@ -277,7 +371,7 @@ impl WorkflowActor {
running.remove(&action_id);
}
- (task_id, action_id, result)
+ (task_id, action_id, result, action_data, message_data)
});
task_futures.push(task_future);
@@ -286,7 +380,7 @@ impl WorkflowActor {
// Wait for this batch of tasks to complete
for task_future in task_futures {
match task_future.await {
- Ok((task_id, action_id, result)) => {
+ Ok((task_id, action_id, result, action_data, message_data)) => {
match result {
Ok(task_result) => {
info!(
@@ -294,40 +388,107 @@ impl WorkflowActor {
self.id, task_id, action_id
);
+ // Extract result and context from TaskResult tuple
+ let (result_value, context_value, started_at, ended_at) =
+ match &task_result {
+ Ok((result, context, start, end)) => (
+ result.clone(),
+ Some(context.clone()),
+ Some(*start),
+ Some(*end),
+ ),
+ Err(_) => (None, None, None, None),
+ };
+
+ // Send task completion update to database
+ let task_update_message = StatusUpdateMessage {
+ operation: Operation::UpdateTask {
+ task_id,
+ account_id: message_data.0, // account_id from message_data
+ flow_session_id: context.flow_session_id,
+ status: TaskStatus::Completed,
+ result: result_value.clone(),
+ context: context_value.clone(),
+ error: None,
+ started_at,
+ ended_at,
+ },
+ };
+
+ if let Err(e) = self
+ .state
+ .task_updater_sender
+ .send(task_update_message)
+ .await
+ {
+ error!(
+ "[WORKFLOW_ACTOR_{}] Failed to send task completion update for {}: {}",
+ self.id, task_id, e
+ );
+ }
+
// Store completed task with its result for future bundling
// Create a minimal task for in-memory storage
- //TODO: this seems kinda dangerous since some of this data is false!
- let mut completed_task = Task {
+ let (
+ action_label,
+ action_type,
+ plugin_name,
+ plugin_version,
+ inputs,
+ inputs_schema,
+ plugin_config,
+ plugin_config_schema,
+ ) = action_data;
+ let (
+ account_id,
+ flow_version_id,
+ trigger_id,
+ trigger_session_id,
+ published,
+ ) = message_data;
+
+ // Clone values before they get moved into the Task struct
+ let plugin_name_for_filter_check = plugin_name.clone();
+ let result_value_for_filter_check = result_value.clone();
+
+ info!(
+ "[WORKFLOW_ACTOR_{}] Completed task {} (action {}) with result {:?}",
+ self.id, task_id, action_id, result_value
+ );
+
+ let completed_task = Task {
task_id,
- account_id: Uuid::new_v4(), // Placeholder
- task_status: crate::types::task_types::TaskStatus::Completed,
+ account_id,
+ task_status: TaskStatus::Completed,
flow_id: context.workflow_id,
- flow_version_id: Uuid::new_v4(), // Placeholder
- action_label: "".to_string(), // Placeholder
- trigger_id: "".to_string(), // Placeholder
- trigger_session_id: Uuid::new_v4(), // Placeholder
- trigger_session_status:
- crate::types::task_types::TriggerSessionStatus::Completed,
+ flow_version_id,
+ action_label,
+ trigger_id,
+ trigger_session_id,
+ trigger_session_status: TriggerSessionStatus::Completed,
flow_session_id: context.flow_session_id,
- flow_session_status:
- crate::types::task_types::FlowSessionStatus::Running,
+ flow_session_status: FlowSessionStatus::Running,
action_id: action_id.clone(),
- r#type: crate::types::action_types::ActionType::Action,
- plugin_name: None,
- plugin_version: None,
- stage: crate::types::task_types::Stage::Production,
+ r#type: action_type,
+ plugin_name: Some(plugin_name),
+ plugin_version: Some(plugin_version),
+ stage: if published {
+ crate::types::task_types::Stage::Production
+ } else {
+ crate::types::task_types::Stage::Testing
+ },
test_config: None,
config: crate::types::task_types::TaskConfig {
- inputs: None,
- inputs_schema: None,
- plugin_config: None,
- plugin_config_schema: None,
+ inputs: Some(inputs),
+ inputs_schema,
+ plugin_config: Some(plugin_config),
+ plugin_config_schema: Some(plugin_config_schema),
},
- context: None,
- started_at: None,
- ended_at: None,
+ context: context_value,
+ started_at,
+ ended_at,
debug_result: None,
- result: None,
+ result: result_value,
error: None,
archived: false,
updated_at: None,
@@ -337,10 +498,42 @@ impl WorkflowActor {
processing_order: 0,
};
- // Extract result from TaskResult tuple
- if let Ok((result_value, context_value, _, _)) = &task_result {
- completed_task.result = result_value.clone();
- completed_task.context = Some(context_value.clone());
+ // Check if this is a filter task that failed (returned null)
+ // The filter plugin already handles truthiness evaluation and returns null for failed filters
+ if plugin_name_for_filter_check.as_str() == "@anything/filter" {
+ let should_stop_path = match &result_value_for_filter_check {
+ Some(Value::Null) => {
+ info!(
+ "[WORKFLOW_ACTOR_{}] Filter task {} failed, stopping dependent actions",
+ self.id, task_id
+ );
+ true
+ }
+ Some(_) => {
+ info!(
+ "[WORKFLOW_ACTOR_{}] Filter task {} passed, continuing execution",
+ self.id, task_id
+ );
+ false
+ }
+ None => {
+ info!(
+ "[WORKFLOW_ACTOR_{}] Filter task {} returned no result, stopping dependent actions",
+ self.id, task_id
+ );
+ true
+ }
+ };
+
+ // If the filter failed, add it to the failed filters set
+ if should_stop_path {
+ let mut failed = failed_filters.write().await;
+ failed.insert(action_id.clone());
+ info!(
+ "[WORKFLOW_ACTOR_{}] Added failed filter {} to failed_filters set",
+ self.id, action_id
+ );
+ }
}
{
@@ -354,9 +547,57 @@ impl WorkflowActor {
self.id, task_id, action_id, e
);
- // ๐ฅ WORKFLOW FAILURE - Would normally send workflow failure status to database
- info!("๐ฅ WORKFLOW FAILURE: Workflow {} failed due to task {} failure (skipping database update for debugging)", context.flow_session_id, task_id);
- //TODO: we should probably send a failure status update for the task as well
+ // Send task failure update to database
+ let task_error_message = StatusUpdateMessage {
+ operation: Operation::UpdateTask {
+ task_id,
+ account_id: message_data.0, // account_id from message_data
+ flow_session_id: context.flow_session_id,
+ status: TaskStatus::Failed,
+ result: None,
+ context: None,
+ error: Some(serde_json::json!({
+ "error": e.to_string(),
+ "error_type": "task_execution_error"
+ })),
+ started_at: None,
+ ended_at: Some(Utc::now()),
+ },
+ };
+
+ if let Err(send_err) = self
+ .state
+ .task_updater_sender
+ .send(task_error_message)
+ .await
+ {
+ error!(
+ "[WORKFLOW_ACTOR_{}] Failed to send task error update for {}: {}",
+ self.id, task_id, send_err
+ );
+ }
+
+ // Send workflow failure status to database
+ let workflow_failure_message = StatusUpdateMessage {
+ operation: Operation::CompleteWorkflow {
+ flow_session_id: context.flow_session_id,
+ account_id: message_data.0, // account_id from message_data
+ status: FlowSessionStatus::Failed,
+ trigger_status: TriggerSessionStatus::Failed,
+ },
+ };
+
+ if let Err(send_err) = self
+ .state
+ .task_updater_sender
+ .send(workflow_failure_message)
+ .await
+ {
+ error!(
+ "[WORKFLOW_ACTOR_{}] Failed to send workflow failure update: {}",
+ self.id, send_err
+ );
+ }
return Err(format!("Task {} failed: {:?}", task_id, e).into());
}
@@ -373,8 +614,32 @@ impl WorkflowActor {
}
}
- // ๐ WORKFLOW COMPLETED - Would normally send workflow completion status to database
- info!("๐ WORKFLOW COMPLETED: Workflow {} finished successfully with all tasks completed (skipping database update for debugging)", context.flow_session_id);
+ // Send workflow completion status to database
+ let workflow_completion_message = StatusUpdateMessage {
+ operation: Operation::CompleteWorkflow {
+ flow_session_id: context.flow_session_id,
+ account_id: message.workflow_version.account_id,
+ status: FlowSessionStatus::Completed,
+ trigger_status: TriggerSessionStatus::Completed,
+ },
+ };
+
+ if let Err(e) = self
+ .state
+ .task_updater_sender
+ .send(workflow_completion_message)
+ .await
+ {
+ error!(
+ "[WORKFLOW_ACTOR_{}] Failed to send workflow completion update: {}",
+ self.id, e
+ );
+ }
+
+ info!(
+ "[WORKFLOW_ACTOR_{}] Workflow {} completed successfully with all tasks completed",
+ self.id, context.flow_session_id
+ );
Ok(())
}
diff --git a/core/anything-server/src/main.rs b/core/anything-server/src/main.rs
index 4399f9d0..4b2b680b 100644
--- a/core/anything-server/src/main.rs
+++ b/core/anything-server/src/main.rs
@@ -23,7 +23,7 @@ use tower_http::set_header::SetResponseHeaderLayer;
use tokio::sync::mpsc;
use aws_sdk_s3::Client as S3Client;
use files::r2_client::get_r2_client;
-use tokio::signal::unix::{signal, SignalKind};
+use tokio::signal;
use tokio::time::sleep;
use dashmap::DashMap;
@@ -59,6 +59,7 @@ mod testing;
mod trigger_engine;
mod agents;
mod metrics;
+mod websocket;
use tokio::sync::oneshot;
use std::sync::atomic::AtomicBool;
@@ -100,8 +101,7 @@ pub struct AppState {
bundler_accounts_cache: DashMap,
shutdown_signal: Arc,
// WebSocket infrastructure
- // websocket_connections: DashMap,
- // workflow_broadcaster: websocket::WorkflowBroadcaster,
+ websocket_manager: Arc,
}
// #[tokio::main(flavor = "multi_thread", worker_threads = 1)]
@@ -214,7 +214,7 @@ async fn main() {
let (task_updater_tx, task_updater_rx) = mpsc::channel::(100000);
// Create WebSocket infrastructure
-// let (workflow_broadcaster, _) = broadcast::channel(1000);
+ let websocket_manager = Arc::new(websocket::WebSocketManager::new());
let default_http_timeout = Duration::from_secs(30); // Default 30-second timeout
let http_client = Client::builder()
@@ -241,6 +241,7 @@ async fn main() {
bundler_accounts_cache: DashMap::new(),
shutdown_signal: Arc::new(AtomicBool::new(false)),
task_updater_sender: task_updater_tx.clone(), // Store the sender in AppState
+ websocket_manager: websocket_manager.clone(),
});
pub async fn root() -> impl IntoResponse {
@@ -430,6 +431,12 @@ pub async fn root() -> impl IntoResponse {
.route("/account/:account_id/file/:file_id", delete(files::routes::delete_file))
.route("/account/:account_id/file/:file_id/download", get(files::routes::get_file_download_url))
+ // WebSocket connections
+ .route("/ws/:connection_id", get(websocket::websocket_handler))
+
+ // Workflow testing WebSocket connections
+ .route("/account/:account_id/testing/workflow/session/:flow_session_id/ws", get(websocket::workflow_testing_websocket_handler))
+
.layer(middleware::from_fn_with_state(
state.clone(),
account_auth_middleware::account_access_middleware,
@@ -480,20 +487,49 @@ pub async fn root() -> impl IntoResponse {
let state_clone = state.clone();
tokio::spawn(async move {
- let mut sigterm = signal(SignalKind::terminate()).unwrap();
- sigterm.recv().await;
- info!("Received SIGTERM signal");
-
- // Set the shutdown signal
- state_clone.shutdown_signal.store(true, std::sync::atomic::Ordering::SeqCst);
+ #[cfg(unix)]
+ {
+ use tokio::signal::unix::{signal, SignalKind};
+ if let Ok(mut sigterm) = signal(SignalKind::terminate()) {
+ sigterm.recv().await;
+ info!("Received SIGTERM signal");
+
+ // Set the shutdown signal
+ state_clone.shutdown_signal.store(true, std::sync::atomic::Ordering::SeqCst);
+
+ // Give time for in-flight operations to complete
+ sleep(Duration::from_secs(20)).await;
+ }
+ }
- // Give time for in-flight operations to complete
- sleep(Duration::from_secs(20)).await;
+ #[cfg(not(unix))]
+ {
+ // For non-Unix systems, just wait for Ctrl+C
+ if let Ok(()) = signal::ctrl_c().await {
+ info!("Received Ctrl+C signal");
+
+ // Set the shutdown signal
+ state_clone.shutdown_signal.store(true, std::sync::atomic::Ordering::SeqCst);
+
+ // Give time for in-flight operations to complete
+ sleep(Duration::from_secs(20)).await;
+ }
+ }
});
// Run the API server
- let listener = tokio::net::TcpListener::bind(&bind_address).await.unwrap();
- axum::serve(listener, app).await.unwrap();
+ let listener = tokio::net::TcpListener::bind(&bind_address).await
+ .unwrap_or_else(|e| {
+ error!("[MAIN] Failed to bind to address {}: {}", bind_address, e);
+ panic!("Cannot bind to address {}. Check if port is already in use: {}", bind_address, e);
+ });
+
+ info!("[MAIN] Server listening on {}", bind_address);
+
+ if let Err(e) = axum::serve(listener, app).await {
+ error!("[MAIN] Server failed to run: {}", e);
+ panic!("Server error: {}", e);
+ }
// Add this with your other spawned tasks:
// tokio::spawn(periodic_thread_warmup(state.clone()));
diff --git a/core/anything-server/src/processor/execute_task.rs b/core/anything-server/src/processor/execute_task.rs
index e3cf41fc..e6dbbf2e 100644
--- a/core/anything-server/src/processor/execute_task.rs
+++ b/core/anything-server/src/processor/execute_task.rs
@@ -202,7 +202,7 @@ async fn execute_plugin_inner(
process_http_task(&state.http_client, bundled_plugin_config).await
}
"@anything/filter" => {
- info!("[EXECUTE_TASK] Executing filter plugin with RustyScript worker");
+ info!("[EXECUTE_TASK] Executing filter plugin with gRPC JavaScript executor");
process_filter_task(bundled_inputs, bundled_plugin_config).await
}
"@anything/javascript" => {
diff --git a/core/anything-server/src/processor/parallelizer.rs b/core/anything-server/src/processor/parallelizer.rs
index b938518a..562fb6a8 100644
--- a/core/anything-server/src/processor/parallelizer.rs
+++ b/core/anything-server/src/processor/parallelizer.rs
@@ -302,6 +302,7 @@ impl EnhancedParallelProcessor {
let task_message = StatusUpdateMessage {
operation: Operation::CompleteWorkflow {
flow_session_id: self.context.flow_session_id,
+ account_id: self.context.workflow.account_id,
status: FlowSessionStatus::Completed,
trigger_status: TriggerSessionStatus::Completed,
},
diff --git a/core/anything-server/src/processor/path_processor.rs b/core/anything-server/src/processor/path_processor.rs
index 87fb060d..f969f9b4 100644
--- a/core/anything-server/src/processor/path_processor.rs
+++ b/core/anything-server/src/processor/path_processor.rs
@@ -357,6 +357,8 @@ impl EnhancedBranchProcessor {
let error_update = StatusUpdateMessage {
operation: Operation::UpdateTask {
task_id: task.task_id,
+ flow_session_id: self.context.flow_session_id,
+ account_id: self.context.workflow.account_id,
started_at: None,
ended_at: Some(chrono::Utc::now()),
status: TaskStatus::Failed,
@@ -384,6 +386,7 @@ impl EnhancedBranchProcessor {
let workflow_failure = StatusUpdateMessage {
operation: Operation::CompleteWorkflow {
flow_session_id: self.context.flow_session_id,
+ account_id: self.context.workflow.account_id,
status: FlowSessionStatus::Failed,
trigger_status: TriggerSessionStatus::Failed,
},
diff --git a/core/anything-server/src/processor/processor_utils.rs b/core/anything-server/src/processor/processor_utils.rs
index 7b8418b3..04e5c4ab 100644
--- a/core/anything-server/src/processor/processor_utils.rs
+++ b/core/anything-server/src/processor/processor_utils.rs
@@ -35,6 +35,8 @@ pub async fn create_task(
let create_task_message = StatusUpdateMessage {
operation: Operation::CreateTask {
task_id: task.task_id.clone(),
+ account_id: ctx.workflow.account_id,
+ flow_session_id: ctx.flow_session_id,
input: task.clone(),
},
};
@@ -137,6 +139,8 @@ pub async fn create_task_for_action(
let create_task_message = StatusUpdateMessage {
operation: Operation::CreateTask {
task_id: task.task_id.clone(),
+ account_id: ctx.workflow.account_id,
+ flow_session_id: ctx.flow_session_id,
input: task.clone(),
},
};
@@ -287,6 +291,8 @@ pub async fn update_completed_task_with_result(
let task_message = StatusUpdateMessage {
operation: Operation::UpdateTask {
task_id: task.task_id.clone(),
+ account_id: ctx.workflow.account_id,
+ flow_session_id: ctx.flow_session_id,
status: TaskStatus::Completed,
result: task_result.clone(),
error: None,
@@ -320,6 +326,8 @@ pub async fn handle_task_error(
let error_message = StatusUpdateMessage {
operation: Operation::UpdateTask {
task_id: task.task_id.clone(),
+ account_id: ctx.workflow.account_id,
+ flow_session_id: ctx.flow_session_id,
status: TaskStatus::Failed,
result: None,
error: Some(error.error.clone()),
@@ -352,6 +360,29 @@ pub async fn process_task(
);
let started_at = Utc::now();
+
+ // Send running status update for websocket
+ let running_message = StatusUpdateMessage {
+ operation: Operation::UpdateTask {
+ task_id: task.task_id.clone(),
+ account_id: ctx.workflow.account_id,
+ flow_session_id: ctx.flow_session_id,
+ status: TaskStatus::Running,
+ result: None,
+ error: None,
+ context: None,
+ started_at: Some(started_at),
+ ended_at: None,
+ },
+ };
+
+ if let Err(e) = ctx.state.task_updater_sender.send(running_message).await {
+ warn!(
+ "[PROCESSOR_UTILS] Failed to send running status update: {}",
+ e
+ );
+ }
+
let execution_start = Instant::now();
// Get a clone of in-memory tasks for bundling context
diff --git a/core/anything-server/src/status_updater/mod.rs b/core/anything-server/src/status_updater/mod.rs
index d8620242..689952e1 100644
--- a/core/anything-server/src/status_updater/mod.rs
+++ b/core/anything-server/src/status_updater/mod.rs
@@ -2,12 +2,13 @@ use crate::processor::db_calls::{create_task, update_flow_session_status, update
use crate::types::task_types::{FlowSessionStatus, Task, TaskStatus, TriggerSessionStatus};
use crate::AppState;
use crate::metrics::METRICS;
+use crate::websocket::WorkflowTestingUpdate;
use chrono::{DateTime, Utc};
use serde_json::Value;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc::Receiver;
-use tracing::{info, span, warn, Instrument, Level};
+use tracing::{info, span, Instrument, Level};
use uuid::Uuid;
// Define the type of task operation
@@ -15,6 +16,8 @@ use uuid::Uuid;
pub enum Operation {
UpdateTask {
task_id: Uuid,
+ account_id: Uuid,
+ flow_session_id: Uuid,
started_at: Option>,
ended_at: Option>,
status: TaskStatus,
@@ -24,10 +27,13 @@ pub enum Operation {
},
CreateTask {
task_id: Uuid,
+ account_id: Uuid,
+ flow_session_id: Uuid,
input: Task,
},
CompleteWorkflow {
flow_session_id: Uuid,
+ account_id: Uuid,
status: FlowSessionStatus,
trigger_status: TriggerSessionStatus,
},
@@ -106,6 +112,8 @@ pub async fn task_database_status_processor(
match &message.operation {
Operation::UpdateTask {
task_id,
+ account_id: _,
+ flow_session_id: _,
started_at,
ended_at,
status,
@@ -127,13 +135,14 @@ pub async fn task_database_status_processor(
})
.await
}
- Operation::CreateTask { task_id, input } => {
+ Operation::CreateTask { task_id, account_id: _, flow_session_id: _, input } => {
span!(Level::DEBUG, "create_task_db_call", task_id = %task_id).in_scope(|| {
create_task(state.clone(), input)
}).await
}
Operation::CompleteWorkflow {
flow_session_id,
+ account_id: _,
status,
trigger_status,
} => {
@@ -158,7 +167,9 @@ pub async fn task_database_status_processor(
METRICS.record_status_operation_success(operation_duration_ms, operation_type);
info!("[TASK PROCESSOR] Successfully processed update in {}ms", operation_duration_ms);
- // Removed WebSocket broadcast logic after successful database operations
+
+ // Broadcast WebSocket updates after successful database operations
+ broadcast_websocket_update(&state, &message.operation).await;
break;
}
Err(e) => {
@@ -213,6 +224,141 @@ pub async fn task_database_status_processor(
info!("[TASK PROCESSOR] Status updater processor shutdown complete");
}
+async fn get_current_tasks_for_session(state: &Arc, flow_session_id: &Uuid) -> Option {
+ let tasks_query = state
+ .anything_client
+ .from("tasks")
+ .select("task_id,action_label,task_status,result,error,created_at,started_at,ended_at")
+ .eq("flow_session_id", flow_session_id.to_string())
+ .order("created_at.asc")
+ .execute()
+ .await;
+
+ if let Ok(response) = tasks_query {
+ if let Ok(tasks_json) = response.text().await {
+ return serde_json::from_str(&tasks_json).ok();
+ }
+ }
+ None
+}
+
+async fn broadcast_websocket_update(state: &Arc, operation: &Operation) {
+ match operation {
+ Operation::UpdateTask {
+ task_id,
+ account_id,
+ flow_session_id,
+ status,
+ result,
+ error,
+ ..
+ } => {
+ let update_type = match status {
+ TaskStatus::Running => "task_updated",
+ TaskStatus::Completed => "task_completed",
+ TaskStatus::Failed => "task_failed",
+ _ => "task_updated",
+ };
+
+ // Fetch all current tasks for this flow session
+ let tasks_data = get_current_tasks_for_session(state, flow_session_id).await;
+
+ let update = WorkflowTestingUpdate {
+ r#type: "workflow_update".to_string(),
+ update_type: Some(update_type.to_string()),
+ flow_session_id: flow_session_id.to_string(),
+ data: Some(serde_json::json!({
+ "task_id": task_id,
+ "status": status,
+ "result": result,
+ "error": error
+ })),
+ tasks: tasks_data,
+ complete: None,
+ };
+
+ state.websocket_manager.broadcast_workflow_testing_update(
+ &account_id.to_string(),
+ &flow_session_id.to_string(),
+ update,
+ );
+
+ info!(
+ "[WEBSOCKET] Broadcasted task update for task {} in session {} to account {}",
+ task_id, flow_session_id, account_id
+ );
+ }
+ Operation::CreateTask {
+ task_id,
+ account_id,
+ flow_session_id,
+ ..
+ } => {
+ // Fetch all current tasks for this flow session
+ let tasks_data = get_current_tasks_for_session(state, flow_session_id).await;
+
+ let update = WorkflowTestingUpdate {
+ r#type: "workflow_update".to_string(),
+ update_type: Some("task_created".to_string()),
+ flow_session_id: flow_session_id.to_string(),
+ data: Some(serde_json::json!({
+ "task_id": task_id
+ })),
+ tasks: tasks_data,
+ complete: None,
+ };
+
+ state.websocket_manager.broadcast_workflow_testing_update(
+ &account_id.to_string(),
+ &flow_session_id.to_string(),
+ update,
+ );
+
+ info!(
+ "[WEBSOCKET] Broadcasted task creation for task {} in session {} to account {}",
+ task_id, flow_session_id, account_id
+ );
+ }
+ Operation::CompleteWorkflow {
+ flow_session_id,
+ account_id,
+ status,
+ trigger_status: _,
+ } => {
+ let update_type = match status {
+ FlowSessionStatus::Completed => "workflow_completed",
+ FlowSessionStatus::Failed => "workflow_failed",
+ _ => "workflow_updated",
+ };
+
+ // Fetch final tasks for this flow session
+ let tasks_data = get_current_tasks_for_session(state, flow_session_id).await;
+
+ let update = WorkflowTestingUpdate {
+ r#type: "workflow_update".to_string(),
+ update_type: Some(update_type.to_string()),
+ flow_session_id: flow_session_id.to_string(),
+ data: Some(serde_json::json!({
+ "status": status
+ })),
+ tasks: tasks_data,
+ complete: Some(matches!(status, FlowSessionStatus::Completed | FlowSessionStatus::Failed)),
+ };
+
+ state.websocket_manager.broadcast_workflow_testing_update(
+ &account_id.to_string(),
+ &flow_session_id.to_string(),
+ update,
+ );
+
+ info!(
+ "[WEBSOCKET] Broadcasted workflow completion for session {} to account {}",
+ flow_session_id, account_id
+ );
+ }
+ }
+}
+
diff --git a/core/anything-server/src/system_plugins/filter/mod.rs b/core/anything-server/src/system_plugins/filter/mod.rs
index f0c9bcdf..1c43ca51 100644
--- a/core/anything-server/src/system_plugins/filter/mod.rs
+++ b/core/anything-server/src/system_plugins/filter/mod.rs
@@ -1,12 +1,54 @@
-use rustyscript::worker::{DefaultWorker, DefaultWorkerOptions};
-use serde_json::{json, Value};
-use std::time::{Duration, Instant};
-use tracing::{error, info, instrument, warn};
+use serde_json::Value;
+use std::time::Instant;
+use tracing::{error, info, instrument};
use uuid::Uuid;
-/// Enhanced filter task processor optimized for the actor system
+// Import the JavaScript executor functionality
+use crate::system_plugins::javascript::{execute_javascript_grpc, JsExecutorManager};
+
+/// Auto-inject return statement if the condition doesn't already have one
+/// This allows users to write simple conditions like "inputs.value > 10" instead of "return inputs.value > 10"
+fn auto_inject_return_statement(code: &str) -> String {
+ let trimmed = code.trim();
+
+ // If the code is empty, return it as-is
+ if trimmed.is_empty() {
+ return code.to_string();
+ }
+
+ // Check if code already contains a return statement
+ // We look for "return" as a word boundary to avoid matching it in strings or variable names
+ let has_return = trimmed
+ .split_whitespace()
+ .any(|word| word.starts_with("return"));
+
+ // If it already has a return statement, use the code as-is
+ if has_return {
+ info!("[FILTER] Condition already contains 'return', using as-is");
+ return code.to_string();
+ }
+
+ // Check if this looks like a multi-statement block (contains semicolons or newlines with non-trivial content)
+ let is_complex = trimmed.contains(';')
+ || trimmed.lines().count() > 1
+ && trimmed
+ .lines()
+ .any(|line| !line.trim().is_empty() && !line.trim().starts_with("//"));
+
+ if is_complex {
+ // For complex code, wrap it in a function that returns the last expression
+ info!("[FILTER] Complex condition detected, wrapping in function with return");
+ format!("(() => {{ {} }})()", trimmed)
+ } else {
+ // For simple expressions, just prepend return
+ info!("[FILTER] Simple condition detected, prepending 'return'");
+ format!("return ({})", trimmed)
+ }
+}
+
+/// Enhanced filter task processor using gRPC JavaScript executor
/// This is used for conditional logic and boolean expressions
-/// Uses RustyScript workers for safe JavaScript execution
+/// Now uses the same gRPC JavaScript executor as the main JavaScript plugin
#[instrument(skip(bundled_inputs, bundled_plugin_config))]
pub async fn process_filter_task(
bundled_inputs: &Value,
@@ -14,10 +56,9 @@ pub async fn process_filter_task(
) -> Result