- Restate provides durable execution: code automatically stores completed steps and resumes from where it left off on failures
- All handlers receive a
Context/ObjectContext/WorkflowContext/ObjectSharedContext/WorkflowSharedContextobject as the first argument - Handlers can take typed inputs and return typed outputs using Python type hints and Pydantic models
import restate
my_service = restate.Service("MyService")
@my_service.handler("myHandler")
async def my_handler(ctx: restate.Context, greeting: str) -> str:
return f"${greeting}!"
app = restate.app([my_service])import restate
my_object = restate.VirtualObject("MyVirtualObject")
@my_object.handler("myHandler")
async def my_handler(ctx: restate.ObjectContext, greeting: str) -> str:
return f"${greeting} ${ctx.key()}!"
@my_object.handler(kind="shared")
async def my_concurrent_handler(ctx: restate.ObjectSharedContext, greeting: str) -> str:
return f"${greeting} ${ctx.key()}!"
app = restate.app([my_object])import restate
my_workflow = restate.Workflow("MyWorkflow")
@my_workflow.main()
async def run(ctx: restate.WorkflowContext, req: str) -> str:
# ... implement workflow logic here ---
return "success"
@my_workflow.handler()
async def interact_with_workflow(ctx: restate.WorkflowSharedContext, req: str):
# ... implement interaction logic here ...
return
app = restate.app([my_workflow])❌ Never use global variables - not durable, lost across replicas.
✅ Use ctx.get() and ctx.set() - durable and scoped to the object's key.
# Get state
count = await ctx.get("count", type_hint=int) or 0
# Set state
ctx.set("count", count + 1)
# Clear state
ctx.clear("count")
ctx.clear_all()
# Get all state keys
keys = ctx.state_keys()# Call a Service
response = await ctx.service_call(my_handler, "Hi")
# Call a Virtual Object
response2 = await ctx.object_call(my_object_handler, key="object-key", arg="Hi")
# Call a Workflow
response3 = await ctx.workflow_call(run, "wf-id", arg="Hi")ctx.service_send(my_handler, "Hi")
ctx.object_send(my_object_handler, key="object-key", arg="Hi")
ctx.workflow_send(run, "wf-id", arg="Hi")ctx.service_send(
my_handler,
"Hi",
send_delay=timedelta(hours=5)
)Call a service without using the generated client, but just String names.
response = await ctx.generic_call(
"MyObject", "my_handler", key="Mary", arg=json.dumps("Hi").encode("utf-8")
)response = await ctx.service_call(
my_service.my_handler,
"Hi",
idempotency_key="my-key"
)❌ Never call external APIs/DBs directly - will re-execute during replay, causing duplicates.
✅ Wrap in ctx.run() or ctx.run_typed() - Restate journals the result; runs only once.
# Wrap non-deterministic code in ctx.run
result = await ctx.run_typed("my-side-effect", call_external_api, query="weather", some_id="123")
# Or with typed version for better type safety
result = await ctx.run_typed("my-side-effect", call_external_api)❌ Never use random.random() - non-deterministic and breaks replay logic.
✅ Use ctx.random() or ctx.uuid4() - Restate journals the result for deterministic replay.
❌ Never use time.time(), datetime.now() - returns different values during replay.
✅ Use ctx.now() - Restate records and replays the same timestamp.
❌ Never use asyncio.sleep() or time.sleep() - not durable, lost on restarts.
✅ Use ctx.sleep() - durable timer that survives failures.
# Sleep
await ctx.sleep(timedelta(seconds=30))
# Schedule delayed call (different from sleep + send)
ctx.service_send(
my_handler,
"Hi",
send_delay=timedelta(hours=5)
)# Create awakeable
awakeable_id, promise = ctx.awakeable(type_hint=str)
# Send ID to external system
await ctx.run_typed("request_human_review", request_human_review, name=name, awakeable_id=awakeable_id)
# Wait for result
review = await promise
# Resolve from another handler
ctx.resolve_awakeable(awakeable_id, "Looks good!")
# Reject from another handler
ctx.reject_awakeable(awakeable_id, "Cannot be reviewed")# Wait for promise
review = await ctx.promise("review").value()
# Resolve promise
await ctx.promise("review").resolve("approval")Always use Restate combinators (restate.gather, restate.select) instead of Python's native asyncio methods - they journal execution order for deterministic replay.
Returns when all futures complete. Use to wait for multiple operations to finish.
# ❌ BAD
results1 = await asyncio.gather(call1(), call2())
# ✅ GOOD
claude_call = ctx.service_call(ask_openai, "What is the weather?")
openai_call = ctx.service_call(ask_claude, "What is the weather?")
results2 = await restate.gather(claude_call, openai_call)Returns immediately when the first future completes. Use for timeouts and racing operations.
# ❌ BAD
result1 = await asyncio.wait([call1(), call2()], return_when=asyncio.FIRST_COMPLETED)
# ✅ GOOD
confirmation = ctx.awakeable(type_hint=str)
match await restate.select(
confirmation=confirmation[1],
timeout=ctx.sleep(timedelta(days=1))
):
case ["confirmation", result]:
print("Got confirmation:", result)
case ["timeout", _]:
raise restate.TerminalError("Timeout!")# Send a request, get the invocation id
handle = ctx.service_send(
my_handler, arg="Hi", idempotency_key="my-idempotency-key"
)
invocation_id = await handle.invocation_id()
# Now re-attach
result = await ctx.attach_invocation(invocation_id)
# Cancel invocation
ctx.cancel_invocation(invocation_id)By default, Python SDK uses built-in JSON support with type hints.
For type safety and validation with Pydantic:
import restate
from pydantic import BaseModel
from restate.serde import Serde
class Greeting(BaseModel):
name: str
class GreetingResponse(BaseModel):
result: str
greeter = restate.Service("Greeter")
@greeter.handler()
async def greet(ctx: restate.Context, greeting: Greeting) -> GreetingResponse:
return GreetingResponse(result=f"You said hi to {greeting.name}!")class MyData(typing.TypedDict):
"""Represents a response from the GPT model."""
some_value: str
my_number: int
class MySerde(Serde[MyData]):
def deserialize(self, buf: bytes) -> typing.Optional[MyData]:
if not buf:
return None
data = json.loads(buf)
return MyData(some_value=data["some_value"], my_number=data["some_number"])
def serialize(self, obj: typing.Optional[MyData]) -> bytes:
if obj is None:
return bytes()
data = {"some_value": obj["some_value"], "some_number": obj["my_number"]}
return bytes(json.dumps(data), "utf-8")
# For the input/output serialization of your handlers
@my_object.handler(input_serde=MySerde(), output_serde=MySerde())
async def my_handler(ctx: restate.ObjectContext, greeting: str) -> str:
# To serialize state
await ctx.get("my_state", serde=MySerde())
ctx.set("my_state", MyData(some_value="Hi", my_number=15), serde=MySerde())
# To serialize awakeable payloads
ctx.awakeable(serde=MySerde())
# etc.
return "some-output"Restate retries failures indefinitely by default. For permanent business-logic failures (invalid input, declined payment), use TerminalError to stop retries immediately.
from restate import TerminalError
raise TerminalError("Invalid input - will not retry")# Any other thrown error will be retried
raise Exception("Temporary failure - will retry")Install with pip install restate_sdk[harness]
import restate
from src.develop.my_service import app
with restate.test_harness(app) as harness:
restate_client = harness.ingress_client()
print(restate_client.post("/greeter/greet", json="Alice").json())