Back to Blog
5 min read

Prompt Flow Improvements: Building Production AI Pipelines

Prompt Flow has evolved significantly since its introduction. Today I’m exploring the latest improvements for building production-ready AI pipelines.

What is Prompt Flow?

Prompt Flow is a development framework for building LLM-based applications. It provides:

  • Visual flow builder - DAG-based pipeline design
  • Prompty format - Portable prompt definitions
  • Local development - Test without cloud resources
  • Evaluation tools - Built-in quality assessment
  • Deployment options - Multiple hosting choices

Setting Up Prompt Flow

# Install Prompt Flow
pip install promptflow promptflow-tools

# Install additional providers
pip install promptflow-azure  # Azure integration

# Verify installation
pf --version

Creating Flows

Standard Flow

# flow.dag.yaml
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json

inputs:
  question:
    type: string
    default: "What is machine learning?"

outputs:
  answer:
    type: string
    reference: ${generate_answer.output}

nodes:
  - name: retrieve_context
    type: python
    source:
      type: code
      path: retrieve.py
    inputs:
      question: ${inputs.question}

  - name: generate_answer
    type: llm
    source:
      type: code
      path: generate.prompty
    inputs:
      context: ${retrieve_context.output}
      question: ${inputs.question}

Python Node

# retrieve.py
from promptflow import tool
from azure.search.documents import SearchClient
from azure.core.credentials import AzureKeyCredential
import os

@tool
def retrieve_context(question: str) -> str:
    """Retrieve relevant documents from Azure AI Search."""
    client = SearchClient(
        endpoint=os.environ["SEARCH_ENDPOINT"],
        index_name="documents",
        credential=AzureKeyCredential(os.environ["SEARCH_KEY"])
    )

    results = client.search(
        search_text=question,
        top=5,
        select=["content", "title"]
    )

    context_parts = []
    for result in results:
        context_parts.append(f"Title: {result['title']}\n{result['content']}")

    return "\n\n---\n\n".join(context_parts)

Prompty Node

# generate.prompty
---
name: Answer Generator
description: Generate answers using retrieved context
model:
  api: chat
  configuration:
    type: azure_openai
    azure_deployment: gpt-4o
    api_version: 2024-05-01-preview
  parameters:
    temperature: 0.7
    max_tokens: 500
    top_p: 0.95
inputs:
  context:
    type: string
    description: Retrieved document context
  question:
    type: string
    description: User's question
outputs:
  answer:
    type: string
---
system:
You are a helpful assistant that answers questions based on provided context.
Only use information from the context. If the answer isn't in the context, say so.

user:
Context:
{{context}}

Question: {{question}}

Provide a clear, concise answer:

Connection Management

from promptflow import PFClient

pf = PFClient()

# Create Azure OpenAI connection
pf.connections.create_or_update(
    name="aoai-connection",
    type="AzureOpenAI",
    api_key="${env:AZURE_OPENAI_KEY}",
    api_base="${env:AZURE_OPENAI_ENDPOINT}",
    api_version="2024-05-01-preview"
)

# Create Azure AI Search connection
pf.connections.create_or_update(
    name="search-connection",
    type="CognitiveSearch",
    api_key="${env:SEARCH_KEY}",
    api_base="${env:SEARCH_ENDPOINT}"
)

# List connections
for conn in pf.connections.list():
    print(f"{conn.name}: {conn.type}")

Running Flows

Local Execution

from promptflow import PFClient

pf = PFClient()

# Run single input
result = pf.test(
    flow="./rag_flow",
    inputs={"question": "What is Azure AI Studio?"}
)
print(result["answer"])

# Batch run
run = pf.run(
    flow="./rag_flow",
    data="./test_questions.jsonl",
    column_mapping={
        "question": "${data.question}"
    }
)

# Get results
details = pf.get_details(run)
print(details)

Streaming Output

from promptflow import PFClient

pf = PFClient()

# Stream response
for chunk in pf.test(
    flow="./rag_flow",
    inputs={"question": "Explain microservices"},
    stream=True
):
    print(chunk, end="", flush=True)

Evaluation Flows

# eval_flow.dag.yaml
$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json

inputs:
  question:
    type: string
  ground_truth:
    type: string
  prediction:
    type: string

outputs:
  score:
    type: object
    reference: ${aggregate.output}

nodes:
  - name: relevance_score
    type: llm
    source:
      type: code
      path: relevance.prompty
    inputs:
      question: ${inputs.question}
      answer: ${inputs.prediction}

  - name: groundedness_score
    type: llm
    source:
      type: code
      path: groundedness.prompty
    inputs:
      answer: ${inputs.prediction}
      context: ${inputs.ground_truth}

  - name: aggregate
    type: python
    source:
      type: code
      path: aggregate.py
    inputs:
      relevance: ${relevance_score.output}
      groundedness: ${groundedness_score.output}
# aggregate.py
from promptflow import tool

@tool
def aggregate(relevance: str, groundedness: str) -> dict:
    """Aggregate evaluation scores."""
    def parse_score(text: str) -> float:
        try:
            return float(text.strip())
        except:
            return 0.0

    return {
        "relevance": parse_score(relevance),
        "groundedness": parse_score(groundedness),
        "overall": (parse_score(relevance) + parse_score(groundedness)) / 2
    }

Variant Testing

Test different prompt versions:

# flow.dag.yaml with variants
nodes:
  - name: generate_answer
    type: llm
    source:
      type: code
      path: generate.prompty
    inputs:
      context: ${retrieve_context.output}
      question: ${inputs.question}
    variants:
      variant_0:
        source:
          path: generate_v1.prompty
      variant_1:
        source:
          path: generate_v2.prompty
        parameters:
          temperature: 0.5
# Compare variants
from promptflow import PFClient

pf = PFClient()

# Run with different variants
run_v0 = pf.run(
    flow="./rag_flow",
    data="./test_data.jsonl",
    variant="${generate_answer.variant_0}"
)

run_v1 = pf.run(
    flow="./rag_flow",
    data="./test_data.jsonl",
    variant="${generate_answer.variant_1}"
)

# Compare results
comparison = pf.compare_runs([run_v0, run_v1])
print(comparison)

Deployment

Deploy to Azure

from promptflow.azure import PFClient as AzurePFClient

azure_pf = AzurePFClient(
    subscription_id="your-subscription",
    resource_group_name="your-rg",
    workspace_name="your-workspace"
)

# Create deployment
deployment = azure_pf.flows.deploy(
    flow="./rag_flow",
    deployment_name="rag-deployment",
    endpoint_name="rag-endpoint",
    instance_type="Standard_DS3_v2",
    instance_count=1
)

print(f"Endpoint: {deployment.endpoint_url}")

Deploy as Docker

# Build Docker image
pf flow build --source ./rag_flow --output ./docker_build --format docker

# Build and run
cd docker_build
docker build -t rag-flow:latest .
docker run -p 8080:8080 -e AZURE_OPENAI_KEY=$AZURE_OPENAI_KEY rag-flow:latest

Deploy as Python Package

# Build package
pf flow build --source ./rag_flow --output ./package_build --format python

# Install and use
cd package_build
pip install -e .

# Use in code
from rag_flow import FlowExecutor

executor = FlowExecutor()
result = executor.run(question="What is AI?")

Monitoring in Production

# Enable telemetry
from promptflow.tracing import start_trace

start_trace(
    collection="my-rag-app",
    connection_string=os.environ["APPLICATIONINSIGHTS_CONNECTION_STRING"]
)

# Traces are automatically collected
result = pf.test(
    flow="./rag_flow",
    inputs={"question": "What is Azure?"}
)

# Custom spans
from opentelemetry import trace

tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span("custom_operation"):
    # Your code here
    pass

Best Practices

1. Modular Design

flows/
├── rag_flow/
│   ├── flow.dag.yaml
│   ├── retrieve.py
│   ├── generate.prompty
│   └── requirements.txt
├── eval_flow/
│   ├── flow.dag.yaml
│   └── evaluators/
└── shared/
    └── tools/

2. Environment Management

# requirements.txt
promptflow>=1.5.0
promptflow-tools>=1.0.0
azure-search-documents>=11.4.0
openai>=1.0.0

3. Testing Strategy

# test_flow.py
import pytest
from promptflow import PFClient

pf = PFClient()

def test_retrieve_returns_content():
    result = pf.test(
        flow="./rag_flow",
        inputs={"question": "What is Azure?"},
        node="retrieve_context"
    )
    assert result is not None
    assert len(result) > 0

def test_end_to_end():
    result = pf.test(
        flow="./rag_flow",
        inputs={"question": "What is Azure AI Studio?"}
    )
    assert "Azure" in result["answer"]

What’s Next

Tomorrow I’ll cover evaluation improvements in Azure AI Studio.

Resources

Michael John Peña

Michael John Peña

Senior Data Engineer based in Sydney. Writing about data, cloud, and technology.