๐ DSL Execution & Workflow Orchestration System
A declarative, distributed system for defining, registering, and executing graph-based AI workflows using Python modules.
Modular, resource-controlled, and integrated with human-in-the-loop and remote execution capabilities.
Project Status ๐ง
- Alpha: This project is in active development and subject to rapid change. โ ๏ธ
- Testing Phase: Features are experimental; expect bugs, incomplete functionality, and breaking changes. ๐งช
- Not Production-Ready: We do not recommend using this in production (or relying on it) right now. โ
- Compatibility: APIs, schemas, and configuration may change without notice. ๐
- Feedback Welcome: Early feedback helps us stabilize future releases. ๐ฌ
๐ Contents
๐ Highlights
๐งฑ Modular Workflow Architecture
- ๐งฉ Define and package DSL workflows using versioned, ZIP-based bundles
- โ๏ธ DAG-based or router-style execution using Python classes as modular nodes
- ๐ฆ Reusable modules with per-node settings, parameters, and requirements
- ๐ Supports both simple DAGs and advanced routing logic (loops, conditions)
๐ง Intelligent Execution Engine
- ๐ง DSLExecutor SDK for local, multi-process, resource-limited task execution
- ๐ Addons support for LLMs, webhooks, and callbacks
- โ Task-level CPU, memory, and time enforcement
- ๐งโโ๏ธ Human intervention support for manual decision points
๐ Registry and Infra Integration
- ๐๏ธ REST APIs to register, update, query, and delete DSL workflows
- โ๏ธ Workflow archives are uploaded and unpacked via the Creator Server
- โ๏ธ Kubernetes-backed infra provisioning for executor deployment
- ๐ WebSocket and HTTP APIs to run workflows or execute individual nodes
๐ฆ Use Cases
Use Case |
What It Solves |
AI Pipeline Orchestration |
Manage modular Python logic as reusable nodes in a DAG or conditional graph |
Graph-based DSL Execution |
Execute DSL workflows declaratively with clean separation of logic |
Remote + Local Workflows |
Load, prepare, and run DSLs across distributed clusters |
Interactive Workflows |
Integrate human feedback mid-flow using structured intervention logic |
Versioned Modular Nodes |
Define, test, and package Python node logic for reuse and distribution |
๐งฉ Integrations
Component |
Purpose |
MongoDB |
Metadata storage for registered DSLs |
Flask |
Creator and Registry API servers |
Redis |
Workflow state management and output collection |
S3/Ceph |
Workflow ZIP bundle and module storage |
Kubernetes |
Dynamic provisioning of DSL Executors |
AddonsManager |
LLM, webhook, and callback integration |
Multiprocessing |
Task-level parallelism with resource control |
๐ก Why Use This?
Problem |
Our Solution |
๐น Hard to manage modular AI workflows |
DSL JSON format for workflows with reusable modules |
๐น Complex logic orchestration |
DAG + router mode for conditional/dynamic task flows |
๐น No runtime controls for resources |
Enforced CPU/memory/time limits per node |
๐น Inconsistent module packaging |
Structured ZIP archive format with workflow.json and module folders |
๐น Lack of execution observability and control |
State collection and Addons integration |
๐ Project Status
๐ข Actively maintained and production-ready
๐งช Local + remote execution modes
๐๏ธ Integrated SDK and API layers
๐ฆ Workflow versioning, packaging, and remote execution
๐ค Community feedback and contributions welcome
๐ DSL System Components
๐ DSL Workflow Definition
- A DSL is defined using
workflow.json
+ module_*/
directories
- Each module contains a
function.py
with a class implementing eval(...)
workflow.json
describes the graph, settings, and parameters
๐ DSL Registry
- REST API to register, retrieve, update, delete, and query workflows
- MongoDB-backed schema with versioning and DAG structure
- Fully documented schema and API (see docs/)
๐งช DSLExecutor SDK
- Resource-isolated multiprocessing executor
- Addons support: LLMs, callbacks, webhooks
- Persistent state and output tracking
- Full Python API (
execute
, get_task_output
, persist_outputs
...)
โ๏ธ DSL Creator Server
- Upload ZIP with
workflow.json
and module_*/
- Server unpacks, uploads modules to S3, rewrites
codePath
- Sends final DSL to Registry using WorkflowsClient
โ๏ธ DSL Executor Infra APIs
- Provision executors using REST
- Kubernetes deployment + Ambassador ingress
- Execute tasks via HTTP or WebSocket
๐๏ธ Key APIs
Endpoint |
Purpose |
GET /workflows |
List all workflows |
GET /workflows/:workflow_id |
Get workflow by ID |
POST /workflows |
Create a new DSL workflow |
PUT /workflows/:workflow_id |
Update workflow |
DELETE /workflows/:workflow_id |
Delete workflow |
POST /workflows/query |
Query with dynamic filters |
POST /uploadWorkflow |
Upload ZIP archive for registration |
POST /dsl-executor/<id>/create-infra |
Provision an executor |
DELETE /dsl-executor/<id>/remove-infra |
Remove executor infra |
POST /dsl-executor/<id>/execute_dsl |
Run a DSL task |
POST /dsl-graph/<id>/estimate |
Estimate resources for a DSL graph |
POST /dsl-graph/<id>/deploy |
Deploy an adhoc DSL graph |
๐ข Communications
- ๐ง Email: community@opencyberspace.org
- ๐ฌ Discord: OpenCyberspace
- ๐ฆ X (Twitter): @opencyberspace
๐ค Join Us!
AIGrid is community-driven. Theory, Protocol, implementations - All contributions are welcome.
Get Involved