1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
|
## Workflow Agent Reflection Sample (Python)
This sample demonstrates how to wrap an Agent Framework workflow (with iterative review + improvement) as an agent using the Container Agents Adapter. It implements a "reflection" pattern consisting of two executors:
- Worker: Produces an initial answer (and revised answers after feedback)
- Reviewer: Evaluates the answer against quality criteria and either approves or returns constructive feedback
The workflow cycles until the Reviewer approves the response. Only approved content is emitted externally (streamed the same way as a normal agent response). This pattern is useful for quality‑controlled assistance, gated tool use, evaluative chains, or iterative refinement.
### Key Concepts Shown
- `WorkflowBuilder` + `.as_agent()` to expose a workflow as a standard agent
- Bidirectional edges enabling cyclical review (Worker ↔ Reviewer)
- Structured output parsing (Pydantic model) for review feedback
- Emitting `AgentRunUpdateEvent` to stream only approved messages
- Managing pending requests and re‑submission with incorporated feedback
File: `workflow_agent_simple.py`
---
## Prerequisites
> **Azure sign-in:** Run `az login` before starting the sample so `DefaultAzureCredential` can acquire a CLI token.
Dependencies used by `workflow_agent_simple.py`:
- agent-framework-azure-ai (published package with workflow abstractions)
- agents_adapter
- azure-identity (for `DefaultAzureCredential`)
- python-dotenv (loads `.env` for local credentials)
- pydantic (pulled transitively; listed for clarity)
Install from PyPI (from the repo root: `container_agents/`):
```bash
pip install agent-framework-azure-ai azure-identity python-dotenv
pip install -e src/adapter/python
```
---
## Additional Requirements
1. Azure AI project with a model deployment (supports Microsoft hosted, Azure OpenAI, or custom models exposed via Azure AI Foundry).
---
## Configuration
Copy `.envtemplate` to `.env` and fill in real values:
```
AZURE_AI_PROJECT_ENDPOINT=<foundry-project-endpoint>
AZURE_AI_MODEL_DEPLOYMENT_NAME=<model-deployment-name>
AGENT_PROJECT_NAME=<agent-project-name-optional>
```
`AGENT_PROJECT_NAME` lets you override the default Azure AI agent project for this workflow; omit it to fall back to the SDK default.
---
## Run the Workflow Agent
From this folder:
```bash
python workflow_agent_simple.py
```
The server (via the adapter) will start on `0.0.0.0:8088` by default.
---
## Send a Non‑Streaming Request
```bash
curl -sS \
-H "Content-Type: application/json" \
-X POST http://localhost:8088/runs \
-d '{"input":"Explain the concept of reflection in this workflow sample.","stream":false}'
```
Sample output (non‑streaming):
```
Processing 1 million files in parallel and writing their contents into a sorted output file can be a computationally and resource-intensive task. To handle it effectively, you can use Python with libraries like `concurrent.futures` for parallelism and `heapq` for the sorting and merging.
Below is an example implementation:
import os
from concurrent.futures import ThreadPoolExecutor
import heapq
def read_file(file_path):
"""Read the content of a single file and return it as a list of lines."""
with open(file_path, 'r') as file:
return file.readlines()
def parallel_read_files(file_paths, max_workers=8):
"""
Read files in parallel and return all the lines in memory.
:param file_paths: List of file paths to read.
:param max_workers: Number of worker threads to use for parallelism.
"""
all_lines = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit tasks to read each file in parallel
results = executor.map(read_file, file_paths)
# Collect the results
for lines in results:
all_lines.extend(lines)
return all_lines
def write_sorted_output(lines, output_file_path):
"""
Write sorted lines to the output file.
:param lines: List of strings to be sorted and written.
:param output_file_path: File path to write the sorted result.
"""
sorted_lines = sorted(lines)
with open(output_file_path, 'w') as output_file:
output_file.writelines(sorted_lines)
def main(directory_path, output_file_path):
"""
Main function to read files in parallel and write sorted output.
:param directory_path: Path to the directory containing input files.
:param output_file_path: File path to write the sorted output.
"""
# Get a list of all the file paths in the given directory
file_paths = [os.path.join(directory_path, f) for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f))]
print(f"Found {len(file_paths)} files. Reading files in parallel...")
# Read all lines from the files in parallel
all_lines = parallel_read_files(file_paths)
print(f"Total lines read: {len(all_lines)}. Sorting and writing to output file...")
# Write the sorted lines to the output file
write_sorted_output(all_lines, output_file_path)
print(f"Sorted output written to: {output_file_path}")
if __name__ == "__main__":
# Replace these paths with the appropriate input directory and output file path
input_directory = "path/to/input/directory" # Directory containing 1 million files
output_file = "path/to/output/sorted_output.txt" # Output file path
main(input_directory, output_file)
### Key Features and Steps:
1. **Parallel Reading with `ThreadPoolExecutor`**:
- Files are read in parallel using threads to improve I/O performance since reading many files is mostly I/O-bound.
2. **Sorting and Writing**:
- Once all lines are aggregated into memory, they are sorted using Python's `sorted()` function and written to the output file in one go.
3. **Handles Large Number of Files**:
- The program uses threads to manage the potentially massive number of files in parallel, saving time instead of processing them serially.
### Considerations:
- **Memory Usage**: This script reads all file contents into memory. If the total size of the files is too large, you may encounter memory issues. In such cases, consider processing the files in smaller chunks.
- **Sorting**: For extremely large data, consider using an external/merge sort technique to handle sorting in smaller chunks.
- **I/O Performance**: Ensure that your I/O subsystem and disk can handle the load.
Let me know if you'd like an optimized version to handle larger datasets with limited memory!
Usage (if provided): None
```
---
## Send a Streaming Request (Server-Sent Events)
```bash
curl -N \
-H "Content-Type: application/json" \
-X POST http://localhost:8088/runs \
-d '{"input":"How does the reviewer decide to approve?","stream":true}'
```
Sample output (streaming):
```
Here is a Python script that demonstrates parallel reading of 1 million files using `concurrent.futures` for parallelism and `heapq` to write the outputs to a sorted file. This approach ensures efficiency when dealing with such a large number of files.
import os
import heapq
from concurrent.futures import ThreadPoolExecutor
def read_file(file_path):
"""
Read the content of a single file and return it as a list of lines.
"""
with open(file_path, 'r') as file:
return file.readlines()
def parallel_read_files(file_paths, max_workers=4):
"""
Read multiple files in parallel.
"""
all_lines = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit reading tasks to the thread pool
futures = [executor.submit(read_file, file_path) for file_path in file_paths]
# Gather results as they are completed
for future in futures:
all_lines.extend(future.result())
return all_lines
def write_sorted_output(lines, output_file):
"""
Write sorted lines to an output file.
"""
sorted_lines = sorted(lines)
with open(output_file, 'w') as file:
file.writelines(sorted_lines)
if __name__ == "__main__":
# Set the directory containing your input files
input_directory = 'path_to_your_folder_with_files'
# Get the list of all input files
file_paths = [os.path.join(input_directory, f) for f in os.listdir(input_directory) if os.path.isfile(os.path.join(input_directory, f))]
# Specify the number of threads for parallel processing
max_threads = 8 # Adjust according to your system's capabilities
# Step 1: Read all files in parallel
print("Reading files in parallel...")
all_lines = parallel_read_files(file_paths, max_workers=max_threads)
# Step 2: Write the sorted data to the output file
output_file = 'sorted_output.txt'
print(f"Writing sorted output to {output_file}...")
write_sorted_output(all_lines, output_file)
print("Operation complete.")
[comment]: # ( cspell:ignore pysort )
### Key Points:
1. **Parallel Read**: The reading of files is handled using `concurrent.futures.ThreadPoolExecutor`, allowing multiple files to be processed simultaneously.
2. **Sorted Output**: After collecting all lines from the files, the `sorted()` function is used to sort the content in memory. This ensures that the final output file will have all data in sorted order.
3. **Adjustable Parallelism**: The `max_threads` parameter can be modified to control the number of threads used for file reading. The value should match your system's capabilities for optimal performance.
4. **Large Data Handling**: If the data from 1 million files is too large to fit into memory, consider using an external merge sort algorithm or a library like `pysort` for efficient external sorting.
Let me know if you'd like improvements or adjustments for more specific scenarios!
Final usage (if provided): None
```
> Only the final approved assistant content is emitted as normal output deltas; intermediate review feedback stays internal.
---
## How the Reflection Loop Works
1. User query enters the workflow (Worker start executor)
2. Worker produces an answer with model call
3. Reviewer evaluates using a structured schema (`feedback`, `approved`)
4. If not approved: Worker augments context with feedback + regeneration instruction, then re‑answers
5. Loop continues until `approved=True`
6. Approved content is emitted as `AgentRunResponseUpdate` (streamed externally)
---
## Troubleshooting
| Issue | Resolution |
|-------|------------|
| `DefaultAzureCredential` errors | Run `az login` or configure a service principal. |
| Empty / no streaming | Confirm `stream` flag in request JSON and that the event loop is healthy. |
| Model 404 / deployment error | Verify `AZURE_AI_MODEL_DEPLOYMENT_NAME` exists in the Azure AI project configured by `AZURE_AI_PROJECT_ENDPOINT`. |
| `.env` not loading | Ensure `.env` sits beside the script (or set `dotenv_path`) and that `python-dotenv` is installed. |
---
## Related Resources
- Agent Framework repo: https://github.com/microsoft/agent-framework
- Basic simple sample README (same folder structure) for installation reference
---
## License & Support
This sample follows the repository's LICENSE. For questions about unreleased Agent Framework features, contact the Agent Framework team via its GitHub repository.
|