Skip to content

Nodes

Nodes are the individual steps in a workflow. Each node has a type, configuration, and connections.

Node structure

nodes:
- name: "fetch_data" # Unique identifier
type: "http/request" # Node type
config: # Type-specific config
url: "https://api.example.com"
method: GET
input: # Optional input mapping
query: "{{ trigger.body.query }}"

Built-in types

Core nodes

TypePurpose
core/logLog to stdout
core/conditionBranch based on condition
core/sleepDelay execution
core/templateTransform data

Data nodes

TypePurpose
postgres/queryPostgreSQL operations
redis/getRedis read
redis/setRedis write

HTTP nodes

TypePurpose
http/requestGeneric HTTP call
http/webhookReturn response

Message nodes

TypePurpose
slack/postSend Slack message
discord/postSend Discord message
kafka/producePublish to Kafka

Custom nodes

Build nodes in Rust:

use r8r_sdk::{Node, Context, Result};
#[derive(Node)]
#[node(name = "custom/hello")]
struct HelloNode {
name: String,
}
impl Node for HelloNode {
async fn execute(&self, ctx: Context) -> Result<Value> {
Ok(json!({ "message": format!("Hello, {}!", self.name) }))
}
}

Register in r8r.toml:

[nodes]
custom = "./nodes"

Input/output

Nodes communicate via JSON:

nodes:
- name: "fetch"
type: "http/request"
# Outputs: { "status": 200, "body": {...} }
- name: "process"
type: "core/template"
input: "{{ fetch.body.items }}"
# Receives the items array

Error handling

Per-node error control:

nodes:
- name: "api_call"
type: "http/request"
config: { ... }
on_error:
action: retry
max_attempts: 3
delay: 5s