Skip to content

Commit 4fc7d31

Browse files
committed
Add migration guide for adapting LangGraph Functional API to Temporal
Documents key changes required: - Use await instead of .result() for task calls - Entrypoints must be async functions - Tasks must be at module level (importable) - Complete before/after migration example - Summary table of changes
1 parent c15f496 commit 4fc7d31

1 file changed

Lines changed: 147 additions & 0 deletions

File tree

langgraph_plugin/README.md

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,3 +218,150 @@ plugin = LangGraphFunctionalPlugin(
218218
}
219219
)
220220
```
221+
222+
### Adapting LangGraph Functional API for Temporal
223+
224+
When adapting a standard LangGraph functional API graph to run with Temporal, there are a few key changes required:
225+
226+
#### 1. Use `await` When Calling Tasks
227+
228+
In standard LangGraph, you can call tasks with `.result()`:
229+
230+
```python
231+
# Standard LangGraph (won't work with Temporal)
232+
@entrypoint()
233+
def my_workflow(input: str) -> dict:
234+
result = my_task(input).result() # ❌ Blocks - not allowed in Temporal
235+
return {"output": result}
236+
```
237+
238+
With Temporal, you must use `await` because blocking is not allowed in workflows:
239+
240+
```python
241+
# Temporal-compatible
242+
@entrypoint()
243+
async def my_workflow(input: str) -> dict:
244+
result = await my_task(input) # ✅ Async await
245+
return {"output": result}
246+
```
247+
248+
#### 2. Entrypoints Must Be Async
249+
250+
Since tasks require `await`, your entrypoint function must be `async`:
251+
252+
```python
253+
# Standard LangGraph
254+
@entrypoint()
255+
def process(data: str) -> dict: # sync function
256+
...
257+
258+
# Temporal-compatible
259+
@entrypoint()
260+
async def process(data: str) -> dict: # async function
261+
...
262+
```
263+
264+
#### 3. Tasks Must Be Importable
265+
266+
Tasks run as Temporal activities in a separate context. They must be defined at module level (not inside functions or classes) so they can be imported by the worker:
267+
268+
```python
269+
# ✅ Correct - module level
270+
@task
271+
def analyze(text: str) -> str:
272+
return llm.invoke(text)
273+
274+
# ❌ Wrong - can't be imported
275+
def make_workflow():
276+
@task
277+
def analyze(text: str) -> str: # Closure - not importable
278+
return llm.invoke(text)
279+
```
280+
281+
#### 4. Parallel Task Execution
282+
283+
Standard LangGraph parallel execution works the same way, just use `await`:
284+
285+
```python
286+
@entrypoint()
287+
async def research(topic: str) -> dict:
288+
# Start multiple tasks concurrently
289+
futures = [search(query) for query in queries]
290+
291+
# Wait for all results
292+
results = [await f for f in futures]
293+
294+
return {"results": results}
295+
```
296+
297+
#### Complete Migration Example
298+
299+
**Before (Standard LangGraph):**
300+
301+
```python
302+
from langgraph.func import task, entrypoint
303+
304+
@task
305+
def fetch_data(url: str) -> dict:
306+
return requests.get(url).json()
307+
308+
@task
309+
def process_data(data: dict) -> str:
310+
return transform(data)
311+
312+
@entrypoint()
313+
def pipeline(url: str) -> dict:
314+
data = fetch_data(url).result() # Blocking call
315+
output = process_data(data).result() # Blocking call
316+
return {"output": output}
317+
318+
# Direct invocation
319+
result = pipeline.invoke("https://api.example.com")
320+
```
321+
322+
**After (Temporal-compatible):**
323+
324+
```python
325+
from langgraph.func import task, entrypoint
326+
from temporalio import workflow
327+
from temporalio.contrib.langgraph import LangGraphFunctionalPlugin, compile_functional
328+
329+
@task
330+
def fetch_data(url: str) -> dict:
331+
return requests.get(url).json()
332+
333+
@task
334+
def process_data(data: dict) -> str:
335+
return transform(data)
336+
337+
@entrypoint()
338+
async def pipeline(url: str) -> dict: # Made async
339+
data = await fetch_data(url) # Use await
340+
output = await process_data(data) # Use await
341+
return {"output": output}
342+
343+
# Workflow wrapper
344+
@workflow.defn
345+
class PipelineWorkflow:
346+
@workflow.run
347+
async def run(self, url: str) -> dict:
348+
app = compile_functional("pipeline")
349+
return await app.ainvoke(url)
350+
351+
# Plugin registration
352+
plugin = LangGraphFunctionalPlugin(entrypoints={"pipeline": pipeline})
353+
354+
# Worker setup
355+
async with Worker(client, task_queue="q", workflows=[PipelineWorkflow], plugins=[plugin]):
356+
result = await client.execute_workflow(PipelineWorkflow.run, "https://api.example.com", ...)
357+
```
358+
359+
#### Summary of Changes
360+
361+
| Aspect | Standard LangGraph | Temporal Integration |
362+
|--------|-------------------|---------------------|
363+
| Task calls | `task(x).result()` | `await task(x)` |
364+
| Entrypoint | `def func():` (sync OK) | `async def func():` (must be async) |
365+
| Task location | Anywhere | Module level only |
366+
| Invocation | `entrypoint.invoke(x)` | `compile_functional("id").ainvoke(x)` |
367+
| Execution | Direct | Via Temporal workflow |

0 commit comments

Comments
 (0)