DEV Community

Cover image for Simplifying GenAI : Development to observability (OpenSource)
vineetjaiswal
vineetjaiswal

Posted on

Simplifying GenAI : Development to observability (OpenSource)

Open-source toolkit for building GenAI applications with agentic workflows with end to end enterprise grade observability.

A very high level view with key component details

high level architecture

LangChain: Most popular framework for developing applications powered by LLMs, offering chains to connect various components.

LangGraph: A library for building stateful, multi-actor applications with LLMs, used to create agent and multi-agent workflows.

Langfuse: A observabilit ytool for LLM-powered applications, providing detailed traces and performance insights.

Lets understand it with an example, suppose we need to implement this problem statement

“Create a system that classifies user input as sports or non-sports. Based on the classification, provide detailed sports info or relevant general knowledge tailored to the query.”

Implementation

Assuming you’re already familiar with GenAI development, you’re now looking for an open-source ecosystem for production to help lower your production costs, you also need trace all LLM calls which you will use in future

let’s directly jump to the coding part of it, and with flow, let’s understand the eco-system, here are the key steps

1. Graph creation using langgraph

graph = StateGraph(AgentState)  
# Key aspect which hold agent state, and based on this, entire langgraph works 

# Add node (agents)
graph.add_node("classification_agent", classification_agent)  
graph.add_node("sports_agent", sports_agent)  
graph.add_node("non_sports_agent", non_sports_agent)  

graph.set_entry_point("classification_agent")  
# Set the entry point of the graph to the classification agent

# Add conditional edges (will explain _router in later section'
graph.add_conditional_edges(
    "classification_agent",  
    _router,  # Routing function to determine the next agent
    {"sports_agent": "sports_agent", "non_sports_agent": "non_sports_agent"},  
    # Mapping of classification results to agents
)

# Define the end points
graph.add_edge("sports_agent", END)  
graph.add_edge("non_sports_agent", END)  

app = graph.compile()  # Compile the graph into an executable application

# when you want to execute it
output_data = app.invoke(inputs)  # Invoke the application with the prepared inputs
final_success_message = output_data.get("messages")[-1]  # Retrieve the final message from agent state
Enter fullscreen mode Exit fullscreen mode
  • StateGraph is something, which actually passes to each agents and it hold the entire execution states, this is AgentState class
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
Enter fullscreen mode Exit fullscreen mode
  • Edge and Nodes are conventional graph concepts
  • _router is defining condition edge, in general its defining all routing among agents, simple code which just pass message content
def _router(state):
    messages = state["messages"]
    last_message = messages[-1]
    return last_message.content
Enter fullscreen mode Exit fullscreen mode

the simplified graph

Image description

2. Agent Creation with langchain

the simple agent is like this (Without langfuse for now)

def classification_agent(state):
    messages = state.get("messages")
    question = messages[-1]
    print(f"inside classification_agent {question}")
    llm = ChatOpenAI(temperature=0.7)
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "you are an AI bot who can classify"),
            (
                "user",
                "classify input either in sports_agent if related to sports or non_sports_agent if not related to sports category, input: {question}",
            ),
        ]
    )
    retrieval_chain = {"question": RunnablePassthrough()} | prompt | llm
    output_data = retrieval_chain.invoke(
        {"question": question},
    )
    print(f"classification_agent output: {output_data}")
    return {"messages": [output_data]}

def sports_agent(state):
    messages = state.get("messages")
    question = messages[0]
    llm = ChatOpenAI(temperature=0.7)
    print(f"inside sports_agent {question}")
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "you are an AI bot who can generate more details"),
            ("user", "Give me top 10 world cup winner team in json format  for : {question}"),
        ]
    )
    retrieval_chain = {"question": RunnablePassthrough()} | prompt | llm
    output_data = retrieval_chain.invoke(
        {"question": question},
    )

    return {"messages": [output_data]}

def non_sports_agent(state):
    messages = state.get("messages")
    question = messages[0]
    llm = ChatOpenAI(temperature=0.7)
    print(f"inside non_sports_agent {non_sports_agent}")
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "you are an AI bot who can generate more details"),
            ("user", "give me more detail from wikipedia for : {question}"),
        ]
    )
    retrieval_chain = {"question": RunnablePassthrough()} | prompt | llm
    output_data = retrieval_chain.invoke(
        {"question": question},
    )
    return {"messages": [output_data]}
Enter fullscreen mode Exit fullscreen mode

very simple langchain based agents, one for classifiy and rest for specific agentic workflow

3. Integration with Langfuse

Assuming you already setup langfuse and you have access on its dashboard, looks like this

Image description

there are two methods to implement, you can use both or one of them, here are examples

3.1 With decorators method

you need to pass python decorator over the methods like this

@observe() # over main thread or unit of one logical operation
@observe(as_type="generation") # where you actually generate something 
Enter fullscreen mode Exit fullscreen mode

you can also extend it like (code inside the method)

langfuse_context.update_current_observation(
      usage={
          "input": response.usage.input_tokens,
          "output": response.usage.output_tokens
      }
  )
Enter fullscreen mode Exit fullscreen mode

the output looks like this, entire trace of request with all input/output detail like below

langfuse with decorator

3.2 With CallbackHandler

you need to defined callback handler like this

def create_handler(user_identifier, trace_name):
    langfuse_handler = CallbackHandler(
        session_id=user_identifier,
        metadata={"a": "b"},
        trace_name=trace_name,
    )
    return langfuse_handler
Enter fullscreen mode Exit fullscreen mode

and while calling the agents, add some more metadata and pass it will llm chains, complete example

def non_sports_agent_generation(llm, prompt, question, langfuse_handler):
    retrieval_chain = {"question": RunnablePassthrough()} | prompt | llm
    output_data = retrieval_chain.invoke(
        {"question": question},
        config={
            "callbacks": [langfuse_handler],
        },
    )

    return output_data
Enter fullscreen mode Exit fullscreen mode

you can more detailed and formatted observability for such handers like

Langfuse with handler

you can find entire code : https://github.com/jaiswalvineet/langchain_langgraph_langfuse
the entire execution looks like

Image description

Conclusion

With all the above implementations, you can build a complete open-source platform ready for enterprise-grade adoption.

Just to ensure it's easy to understand for beginner users, I've used simple examples, which may not be the most optimized. So, please review the details carefully before using them. Also, I used the paid OpenAI LLM for simplicity, but you can use any open-source LLM, such as Llama, etc.

Comments and feedback are most welcome. Let me know if you'd like further refinements!

Top comments (0)