Skip to content

๐Ÿš€ DSL Execution & Workflow Orchestration System

A declarative, distributed system for defining, registering, and executing graph-based AI workflows using Python modules. Modular, resource-controlled, and integrated with human-in-the-loop and remote execution capabilities.

Project Status ๐Ÿšง

  • Alpha: This project is in active development and subject to rapid change. โš ๏ธ
  • Testing Phase: Features are experimental; expect bugs, incomplete functionality, and breaking changes. ๐Ÿงช
  • Not Production-Ready: We do not recommend using this in production (or relying on it) right now. โ›”
  • Compatibility: APIs, schemas, and configuration may change without notice. ๐Ÿ”„
  • Feedback Welcome: Early feedback helps us stabilize future releases. ๐Ÿ’ฌ

๐Ÿ“š Contents


๐ŸŒŸ Highlights

๐Ÿงฑ Modular Workflow Architecture

  • ๐Ÿงฉ Define and package DSL workflows using versioned, ZIP-based bundles
  • โš™๏ธ DAG-based or router-style execution using Python classes as modular nodes
  • ๐Ÿ“ฆ Reusable modules with per-node settings, parameters, and requirements
  • ๐Ÿ” Supports both simple DAGs and advanced routing logic (loops, conditions)

๐Ÿง  Intelligent Execution Engine

  • ๐Ÿง  DSLExecutor SDK for local, multi-process, resource-limited task execution
  • ๐Ÿ”Œ Addons support for LLMs, webhooks, and callbacks
  • โŒ› Task-level CPU, memory, and time enforcement
  • ๐Ÿงโ€โ™‚๏ธ Human intervention support for manual decision points

๐Ÿ” Registry and Infra Integration

  • ๐Ÿ—‚๏ธ REST APIs to register, update, query, and delete DSL workflows
  • โ˜๏ธ Workflow archives are uploaded and unpacked via the Creator Server
  • โš™๏ธ Kubernetes-backed infra provisioning for executor deployment
  • ๐Ÿ”„ WebSocket and HTTP APIs to run workflows or execute individual nodes

๐Ÿ“ฆ Use Cases

Use Case What It Solves
AI Pipeline Orchestration Manage modular Python logic as reusable nodes in a DAG or conditional graph
Graph-based DSL Execution Execute DSL workflows declaratively with clean separation of logic
Remote + Local Workflows Load, prepare, and run DSLs across distributed clusters
Interactive Workflows Integrate human feedback mid-flow using structured intervention logic
Versioned Modular Nodes Define, test, and package Python node logic for reuse and distribution

๐Ÿงฉ Integrations

Component Purpose
MongoDB Metadata storage for registered DSLs
Flask Creator and Registry API servers
Redis Workflow state management and output collection
S3/Ceph Workflow ZIP bundle and module storage
Kubernetes Dynamic provisioning of DSL Executors
AddonsManager LLM, webhook, and callback integration
Multiprocessing Task-level parallelism with resource control

๐Ÿ’ก Why Use This?

Problem Our Solution
๐Ÿ”น Hard to manage modular AI workflows DSL JSON format for workflows with reusable modules
๐Ÿ”น Complex logic orchestration DAG + router mode for conditional/dynamic task flows
๐Ÿ”น No runtime controls for resources Enforced CPU/memory/time limits per node
๐Ÿ”น Inconsistent module packaging Structured ZIP archive format with workflow.json and module folders
๐Ÿ”น Lack of execution observability and control State collection and Addons integration

๐Ÿ›  Project Status

๐ŸŸข Actively maintained and production-ready ๐Ÿงช Local + remote execution modes ๐ŸŽ›๏ธ Integrated SDK and API layers ๐Ÿ“ฆ Workflow versioning, packaging, and remote execution ๐Ÿค Community feedback and contributions welcome


๐Ÿ“š DSL System Components

๐Ÿ— DSL Workflow Definition

  • A DSL is defined using workflow.json + module_*/ directories
  • Each module contains a function.py with a class implementing eval(...)
  • workflow.json describes the graph, settings, and parameters

๐Ÿ›  DSL Registry

  • REST API to register, retrieve, update, delete, and query workflows
  • MongoDB-backed schema with versioning and DAG structure
  • Fully documented schema and API (see docs/)

๐Ÿงช DSLExecutor SDK

  • Resource-isolated multiprocessing executor
  • Addons support: LLMs, callbacks, webhooks
  • Persistent state and output tracking
  • Full Python API (execute, get_task_output, persist_outputs...)

โ˜๏ธ DSL Creator Server

  • Upload ZIP with workflow.json and module_*/
  • Server unpacks, uploads modules to S3, rewrites codePath
  • Sends final DSL to Registry using WorkflowsClient

โš™๏ธ DSL Executor Infra APIs

  • Provision executors using REST
  • Kubernetes deployment + Ambassador ingress
  • Execute tasks via HTTP or WebSocket

๐Ÿ—‚๏ธ Key APIs

Endpoint Purpose
GET /workflows List all workflows
GET /workflows/:workflow_id Get workflow by ID
POST /workflows Create a new DSL workflow
PUT /workflows/:workflow_id Update workflow
DELETE /workflows/:workflow_id Delete workflow
POST /workflows/query Query with dynamic filters
POST /uploadWorkflow Upload ZIP archive for registration
POST /dsl-executor/<id>/create-infra Provision an executor
DELETE /dsl-executor/<id>/remove-infra Remove executor infra
POST /dsl-executor/<id>/execute_dsl Run a DSL task
POST /dsl-graph/<id>/estimate Estimate resources for a DSL graph
POST /dsl-graph/<id>/deploy Deploy an adhoc DSL graph

๐Ÿ“ข Communications

  1. ๐Ÿ“ง Email: community@opencyberspace.org
  2. ๐Ÿ’ฌ Discord: OpenCyberspace
  3. ๐Ÿฆ X (Twitter): @opencyberspace

๐Ÿค Join Us!

AIGrid is community-driven. Theory, Protocol, implementations - All contributions are welcome.

Get Involved