8 min read
Copilot Actions and Connectors: Extending AI Capabilities
Copilot Actions and Connectors enable AI assistants to interact with external systems. Let’s explore how to build powerful integrations that extend your copilot’s capabilities.
Understanding the Action Framework
User Request → Copilot → Action Router → Connector → External System
↑ ↓
└──── Response ←── Transform ←┘
Building Custom Actions
Simple Action: Data Lookup
Actions in Copilot Studio are implemented using Power Automate flows or Azure Functions.
# Azure Function implementation for customer lookup action
import azure.functions as func
import json
from azure.identity import DefaultAzureCredential
def main(req: func.HttpRequest) -> func.HttpResponse:
"""Customer lookup action - called from Copilot Studio via HTTP connector."""
identifier = req.params.get('identifier')
include_history = req.params.get('include_history', 'false').lower() == 'true'
if not identifier:
return func.HttpResponse(
json.dumps({"success": False, "message": "Identifier required"}),
status_code=400,
mimetype="application/json"
)
# Connect to your CRM (example using Dataverse)
credential = DefaultAzureCredential()
# ... CRM lookup logic ...
# Mock response for example
if "@" in identifier:
customer = {"id": "C001", "name": "John Doe", "email": identifier}
else:
customer = {"id": identifier, "name": "John Doe", "email": "john@example.com"}
result = {
"success": True,
"data": {
"id": customer["id"],
"name": customer["name"],
"email": customer["email"],
"tier": "Gold",
"account_manager": "Jane Smith"
},
"message": f"Found customer: {customer['name']}"
}
if include_history:
result["data"]["purchase_history"] = {
"total_orders": 15,
"lifetime_value": 5000.00,
"recent_orders": [{"date": "2024-01-15", "amount": 150.00}]
}
return func.HttpResponse(json.dumps(result), mimetype="application/json")
# In Copilot Studio:
# 1. Create a custom connector or use HTTP action
# 2. Configure the Azure Function URL
# 3. Define input/output parameters in the topic
Complex Action: Multi-Step Workflow
Multi-step workflows are best implemented using Power Automate flows.
# Power Automate flow definition for multi-step analytics report
# Flow: CreateAnalyticsReportAction
trigger:
type: "When Power Virtual Agents calls a flow"
inputs:
- report_type: string
- date_range: string
- recipients: array
steps:
- name: "Gather Requirements"
action: "Compose"
inputs:
report_config:
type: "@{triggerBody()['report_type']}"
range: "@{triggerBody()['date_range']}"
- name: "Query Fabric Data"
action: "HTTP"
inputs:
method: "POST"
uri: "https://api.fabric.microsoft.com/v1/workspaces/.../queries"
headers:
Authorization: "Bearer @{...}"
- name: "Generate Visuals"
action: "Call Azure Function"
# Generate charts using matplotlib/plotly
- name: "Compile Report"
action: "Create file"
# Create PDF using reporting library
- name: "Distribute"
action: "Send Email"
inputs:
to: "@{triggerBody()['recipients']}"
subject: "Your Analytics Report"
attachments: "@{outputs('Compile_Report')}"
# Register this flow as an action in Copilot Studio
# Azure Function for complex report generation
class CreateAnalyticsReportAction:
"""Multi-step action to create and distribute analytics report."""
name = "CreateAnalyticsReport"
description = "Generate an analytics report and distribute to stakeholders"
steps = [
Step(
name="gather_requirements",
description="Collect report requirements from user"
),
Step(
name="query_data",
description="Execute queries to gather report data"
),
Step(
name="generate_visuals",
description="Create charts and visualizations"
),
Step(
name="compile_report",
description="Assemble the final report"
),
Step(
name="distribute",
description="Send report to stakeholders"
)
]
async def execute_gather_requirements(self, context) -> dict:
"""Interactive step to gather report requirements."""
# This would trigger a conversation to collect:
return {
"report_type": context.get("report_type", "sales_summary"),
"date_range": context.get("date_range", "last_month"),
"dimensions": context.get("dimensions", ["region", "product"]),
"metrics": context.get("metrics", ["revenue", "units"]),
"recipients": context.get("recipients", [])
}
async def execute_query_data(self, context) -> dict:
"""Query data based on requirements."""
requirements = context["gather_requirements"]
# Build dynamic query
query = self.build_query(
report_type=requirements["report_type"],
date_range=requirements["date_range"],
dimensions=requirements["dimensions"],
metrics=requirements["metrics"]
)
result = await self.fabric_client.execute_query(query)
return {
"data": result.to_dict(),
"row_count": len(result),
"query_time_ms": result.execution_time_ms
}
async def execute_generate_visuals(self, context) -> dict:
"""Generate visualizations from data."""
data = context["query_data"]["data"]
requirements = context["gather_requirements"]
visuals = []
# Auto-detect appropriate chart types
for metric in requirements["metrics"]:
for dimension in requirements["dimensions"]:
chart_type = self.suggest_chart_type(dimension, metric)
visual = await self.chart_service.create(
data=data,
x=dimension,
y=metric,
chart_type=chart_type
)
visuals.append(visual)
return {"visuals": visuals}
async def execute_compile_report(self, context) -> dict:
"""Compile final report document."""
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
import io
buffer = io.BytesIO()
pdf = canvas.Canvas(buffer, pagesize=letter)
# Add title
requirements = context["gather_requirements"]
pdf.drawString(100, 750, f"Analytics Report: {requirements['report_type']}")
pdf.drawString(100, 730, f"Period: {requirements['date_range']}")
# Add visuals
y_position = 700
for visual in context["generate_visuals"]["visuals"]:
pdf.drawImage(visual.image_path, 100, y_position - 200, width=400, height=200)
y_position -= 220
pdf.save()
buffer.seek(0)
# Upload to storage
report_url = await self.storage.upload(
content=buffer,
filename=f"report_{datetime.now().isoformat()}.pdf"
)
return {"report_url": report_url}
async def execute_distribute(self, context) -> dict:
"""Distribute report to stakeholders."""
recipients = context["gather_requirements"]["recipients"]
report_url = context["compile_report"]["report_url"]
# Send via email
for recipient in recipients:
await self.email_service.send(
to=recipient,
subject="Your Analytics Report is Ready",
body=f"Your requested report is available: {report_url}",
attachments=[report_url]
)
return {
"distributed_to": recipients,
"distribution_time": datetime.now().isoformat()
}
Building Connectors
REST API Connector
Connectors are defined using OpenAPI specifications and registered in Power Platform.
# OpenAPI connector definition for Microsoft Fabric
# fabric-connector.swagger.json
swagger: "2.0"
info:
title: "Microsoft Fabric Connector"
version: "1.0"
host: "api.fabric.microsoft.com"
basePath: "/v1"
schemes: ["https"]
securityDefinitions:
oauth2:
type: "oauth2"
flow: "accessCode"
authorizationUrl: "https://login.microsoftonline.com/common/oauth2/authorize"
tokenUrl: "https://login.microsoftonline.com/common/oauth2/token"
scopes:
https://api.fabric.microsoft.com/.default: "Access Fabric"
paths:
/workspaces:
get:
operationId: "ListWorkspaces"
summary: "List workspaces"
responses:
200:
description: "Success"
/workspaces/{workspaceId}/items:
get:
operationId: "ListItems"
parameters:
- name: workspaceId
in: path
required: true
type: string
# Python implementation for custom connector logic
class FabricConnector:
"""Connector for Microsoft Fabric APIs."""
name = "Microsoft Fabric"
description = "Connect to Microsoft Fabric workspaces and execute queries"
icon_url = "https://assets.microsoft.com/fabric-icon.png"
auth = AuthType.OAUTH2
oauth_config = {
"authorization_url": "https://login.microsoftonline.com/common/oauth2/v2.0/authorize",
"token_url": "https://login.microsoftonline.com/common/oauth2/v2.0/token",
"scopes": ["https://api.fabric.microsoft.com/.default"]
}
base_url = "https://api.fabric.microsoft.com/v1"
operations = {
"list_workspaces": Operation(
method="GET",
path="/workspaces",
description="List all accessible workspaces",
response_schema={
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"displayName": {"type": "string"},
"type": {"type": "string"}
}
}
}
),
"execute_query": Operation(
method="POST",
path="/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/queries",
description="Execute a SQL query against a lakehouse",
parameters={
"workspace_id": {"type": "string", "required": True},
"lakehouse_id": {"type": "string", "required": True}
},
body_schema={
"type": "object",
"properties": {
"query": {"type": "string"},
"maxRows": {"type": "integer", "default": 1000}
}
}
),
"get_table_schema": Operation(
method="GET",
path="/workspaces/{workspace_id}/lakehouses/{lakehouse_id}/tables/{table_name}/schema",
description="Get schema for a lakehouse table"
)
}
def transform_response(self, operation: str, response: dict) -> dict:
"""Transform API responses for Copilot consumption."""
if operation == "execute_query":
return {
"summary": f"Query returned {len(response.get('results', []))} rows",
"columns": response.get("schema", {}).get("columns", []),
"data": response.get("results", [])[:10], # Sample for display
"total_rows": len(response.get("results", []))
}
return response
Database Connector
For database connections, use Power Platform SQL connectors or create custom connectors.
# Custom database connector using Azure Functions
import pyodbc
import json
import azure.functions as func
class AzureSQLConnector:
"""Connector for Azure SQL Database."""
name = "Azure SQL"
description = "Connect to Azure SQL databases"
connection_parameters = {
"server": {"type": "string", "required": True},
"database": {"type": "string", "required": True},
"authentication": {
"type": "enum",
"values": ["sql", "azure_ad", "managed_identity"],
"default": "managed_identity"
}
}
async def test_connection(self) -> bool:
"""Test database connectivity."""
try:
await self.execute_query("SELECT 1")
return True
except Exception:
return False
async def get_schema(self) -> dict:
"""Get database schema for AI context."""
query = """
SELECT
t.TABLE_SCHEMA,
t.TABLE_NAME,
c.COLUMN_NAME,
c.DATA_TYPE,
c.IS_NULLABLE
FROM INFORMATION_SCHEMA.TABLES t
JOIN INFORMATION_SCHEMA.COLUMNS c
ON t.TABLE_NAME = c.TABLE_NAME
WHERE t.TABLE_TYPE = 'BASE TABLE'
ORDER BY t.TABLE_SCHEMA, t.TABLE_NAME, c.ORDINAL_POSITION
"""
result = await self.execute_query(query)
# Transform to hierarchical structure
schema = {}
for row in result:
table_key = f"{row['TABLE_SCHEMA']}.{row['TABLE_NAME']}"
if table_key not in schema:
schema[table_key] = {"columns": []}
schema[table_key]["columns"].append({
"name": row["COLUMN_NAME"],
"type": row["DATA_TYPE"],
"nullable": row["IS_NULLABLE"] == "YES"
})
return schema
def generate_natural_language_context(self, schema: dict) -> str:
"""Generate schema description for AI."""
descriptions = []
for table, info in schema.items():
cols = ", ".join([f"{c['name']} ({c['type']})" for c in info["columns"]])
descriptions.append(f"Table {table} has columns: {cols}")
return "\n".join(descriptions)
Chaining Actions and Connectors
Action chains are implemented using Power Automate flow sequences.
# Power Automate flow for chained data insights pipeline
name: "DataInsightsPipeline"
trigger: "When Power Virtual Agents calls a flow"
steps:
# Step 1: Query data
- name: "QueryFabric"
action: "HTTP"
uri: "https://api.fabric.microsoft.com/..."
# Step 2: Analyze data
- name: "AnalyzeData"
action: "Azure Function"
inputs:
data: "@{outputs('QueryFabric')}"
# Step 3: Generate narrative
- name: "GenerateNarrative"
action: "Azure OpenAI"
inputs:
prompt: "Analyze: @{outputs('AnalyzeData')}"
# Step 4: Create visualization (conditional)
- name: "CreateVisualization"
condition: "@{outputs('AnalyzeData')['visualization_recommended']}"
action: "Azure Function"
# Return combined results to Copilot
# Python implementation of chained pipeline
class DataInsightsPipeline:
"""Chain multiple actions for end-to-end insights."""
name = "GenerateDataInsights"
description = "Query data, analyze it, and generate insights"
chain = [
{
"action": "FabricConnector.execute_query",
"input_mapping": {
"query": "${user_query}",
"workspace_id": "${workspace_id}"
},
"output_key": "query_result"
},
{
"action": "AnalyzeData",
"input_mapping": {
"data": "${query_result.data}"
},
"output_key": "analysis"
},
{
"action": "GenerateNarrative",
"input_mapping": {
"analysis": "${analysis}",
"original_question": "${user_question}"
},
"output_key": "narrative"
},
{
"action": "CreateVisualization",
"input_mapping": {
"data": "${query_result.data}",
"chart_suggestion": "${analysis.suggested_chart}"
},
"output_key": "visualization",
"conditional": "${analysis.visualization_recommended}"
}
]
async def execute(self, user_question: str, workspace_id: str) -> dict:
context = {
"user_question": user_question,
"workspace_id": workspace_id
}
for step in self.chain:
# Check conditional
if "conditional" in step:
if not self.evaluate_condition(step["conditional"], context):
continue
# Map inputs
inputs = self.map_inputs(step["input_mapping"], context)
# Execute action
result = await self.execute_action(step["action"], inputs)
# Store output
context[step["output_key"]] = result
return {
"narrative": context.get("narrative"),
"visualization": context.get("visualization"),
"raw_data": context.get("query_result", {}).get("data")
}
Actions and connectors are the bridge between conversational AI and enterprise systems. Build them with security, error handling, and user experience in mind.