Example updated and adapted from Mohammad Ghodratigohar's 2022 YouTube tutorial for contemporary Azure Data Explorer (Updated July 2025).
This full notebook is available for download at:
https://github.com/LaurieRhodes/PUBLIC-Scripts/tree/main/Jupyter_ADX
This demonstrates how to integrate Machine Learning into Azure Data Explorer data processing.
Overview
This notebook demonstrates the complete end-to-end machine learning workflow using Azure Machine Learning (AML) and Azure Data Explorer (ADX) for binary classification of room occupancy. The tutorial covers:
- Data Storage: Utilizing ADX for efficient time-series data storage and querying
- Model Training: Training multiple classification models using Azure ML compute clusters
- Model Deployment: Deploying the trained model for inference using ADX's Python plugin
- Integration: Seamless integration between ADX and AML for operationalized ML workflows
Business Context
Room occupancy prediction is a critical component in:
- Smart Building Management: Optimizing HVAC systems for energy efficiency
- Space Utilization: Understanding building usage patterns
- Security Systems: Detecting unauthorized access or anomalous occupancy patterns
- IoT Analytics: Processing sensor data at scale
Dataset Description
The occupancy detection dataset contains environmental sensor readings from an office space:
- Temperature: Room temperature (°C)
- Humidity: Relative humidity (%)
- Light: Illumination levels (Lux)
- CO2: Carbon dioxide concentration (ppm)
- HumidityRatio: Derived humidity metric
- Occupancy: Binary target variable (True/False)
- Test: Train/test split indicator
Prerequisites and Setup
Azure Data Explorer Requirements
Before proceeding, ensure your ADX cluster is properly configured:
- Enable Python Plugin: The Python plugin must be enabled on your ADX cluster. This is required for model inference.
- Navigate to your ADX cluster in the Azure portal
- Go to Configuration → Python plugin
- Enable the plugin (currently tested with Python Python 3.11.7 DL image)
- Documentation: Python Plugin Guide
- Create Sample Dataset: Execute the following KQL commands in your ADX cluster:
// Create the OccupancyDetection table with proper schema
.create table OccupancyDetection (
Timestamp: datetime,
Temperature: real,
Humidity: real,
Light: real,
CO2: real,
HumidityRatio: real,
Occupancy: bool,
Test: bool
)
// Clear existing data and insert sample data with balanced train/test split
.clear table OccupancyDetection data
// Insert sample data with clear occupied vs unoccupied patterns
.ingest inline into table OccupancyDetection <|
2015-02-18T09:16:00.0000000Z,22.5,32.0,450.0,1800.0,0.0055,true,false
2015-02-18T09:17:00.0000000Z,23.0,33.5,480.0,1950.0,0.0058,true,false
2015-02-18T09:18:00.0000000Z,22.8,32.8,465.0,1875.0,0.0057,true,false
2015-02-18T09:19:00.0000000Z,23.2,34.0,490.0,2000.0,0.0059,true,false
2015-02-18T09:20:00.0000000Z,22.9,33.2,470.0,1920.0,0.0058,true,false
2015-02-18T10:16:00.0000000Z,18.5,22.0,0.0,350.0,0.0032,false,false
2015-02-18T10:17:00.0000000Z,18.2,21.5,0.0,340.0,0.0031,false,false
2015-02-18T10:18:00.0000000Z,18.0,21.0,0.0,330.0,0.0030,false,false
2015-02-18T10:19:00.0000000Z,18.3,21.8,0.0,345.0,0.0032,false,false
2015-02-18T10:20:00.0000000Z,18.1,21.3,0.0,335.0,0.0031,false,false
2015-02-18T11:16:00.0000000Z,22.7,32.5,455.0,1850.0,0.0056,true,true
2015-02-18T11:17:00.0000000Z,23.1,33.8,475.0,1900.0,0.0058,true,true
2015-02-18T11:18:00.0000000Z,22.6,32.2,460.0,1820.0,0.0055,true,true
2015-02-18T12:16:00.0000000Z,18.4,22.2,0.0,355.0,0.0033,false,true
2015-02-18T12:17:00.0000000Z,18.0,21.2,0.0,325.0,0.0030,false,true
2015-02-18T12:18:00.0000000Z,18.6,22.5,0.0,360.0,0.0034,false,trueAzure Machine Learning Requirements
- An Azure ML workspace
- A compute cluster for training (will be created if not exists)
- Appropriate Azure Storage account with blob container
- Required Python packages (installed below)
Environment Setup and Package Installation
Installing required packages with version pinning for reproducibility.
import sys
import azureml.core
from azureml.core import Workspace, Experiment, ScriptRunConfig
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.environment import Environment
from azureml.core.conda_dependencies import CondaDependencies
# Install KqlMagic with quiet output to avoid verbose logs
!pip install Kqlmagic --no-cache-dir --upgrade -q
# Display version information for troubleshooting
print(f"Python Version: {sys.version}")
print(f"Azure ML SDK Version: {azureml.core.VERSION}")
print("Environment setup complete.")
Azure ML Workspace Configuration
Workspace Creation
Choose one of the following approaches based on your scenario:
- Create New Workspace: Use the first cell if you need to create a new workspace
- Use Existing Workspace: Use the second cell if connecting to an existing workspace
# Option 1: Create a new Azure ML workspace
# Update these values with your Azure subscription details
SUBSCRIPTION_ID = "your-subscription-id"
RESOURCE_GROUP = "your-resource-group"
WORKSPACE_NAME = "your-workspace-name"
LOCATION = "your-azure-region" # e.g., 'eastus', 'westeurope'
# Uncomment the following lines to create a new workspace
# ws = Workspace.create(
# name=WORKSPACE_NAME,
# subscription_id=SUBSCRIPTION_ID,
# resource_group=RESOURCE_GROUP,
# location=LOCATION,
# exist_ok=True,
# show_output=True
# )
#
# # Save workspace configuration for future use
# ws.write_config()
print("Workspace creation cell - modify and uncomment as needed")
# Option 2: Connect to existing workspace using config.json
# This assumes you have a config.json file in the current directory
# or you've previously run ws.write_config()
try:
ws = Workspace.from_config()
print(f"Connected to workspace: {ws.name}")
print(f"Location: {ws.location}")
print(f"Resource Group: {ws.resource_group}")
print(f"Subscription ID: {ws.subscription_id}")
except Exception as e:
print(f"Error connecting to workspace: {e}")
print("Please ensure config.json exists or create workspace using Option 1")Experiment and Environment Setup
Create Experiment
Experiments in Azure ML provide organization and tracking for related runs. All training jobs will be tracked under this experiment.
# Create experiment for tracking all occupancy prediction runs
EXPERIMENT_NAME = "occupancy-prediction-adx"
exp = Experiment(workspace=ws, name=EXPERIMENT_NAME)
# Create custom environment with modern, stable package versions
# This replaces the legacy approach of hardcoded package versions
env = Environment(name="occupancy-detection-env-modern")
# Define conda dependencies with current best practices
conda_deps = CondaDependencies()
# Use Python 3.9 for broad compatibility and stability
conda_deps.add_conda_package("python=3.9")
# Core data science packages with minimum versions for compatibility
conda_deps.add_conda_package("pandas>=1.3.0")
conda_deps.add_conda_package("scikit-learn>=1.0.0")
conda_deps.add_conda_package("numpy>=1.21.0")
# Azure ML specific packages
conda_deps.add_pip_package("azureml-defaults")
conda_deps.add_pip_package("azure-storage-blob>=12.0.0")
# Assign dependencies to environment
env.python.conda_dependencies = conda_deps
print(f"Experiment '{EXPERIMENT_NAME}' created successfully")
print("Custom environment configured with modern package versions")
Compute Infrastructure Setup
Create or Connect to Compute Cluster
Azure ML Compute provides managed, scalable compute for training. The cluster will:
- Auto-scale: Start with minimum nodes and scale up as needed
- Cost-efficient: Scale down to minimum nodes when idle
- Managed: Azure handles OS updates, driver installation, etc.
Note: Initial cluster creation takes approximately 5 minutes. Subsequent runs reuse the existing cluster.
# Compute cluster configuration
COMPUTE_NAME = "cpu-cluster"
VM_SIZE = "STANDARD_D2_V2" # 2 cores, 7GB RAM - suitable for small ML workloads
MIN_NODES = 0 # Cost optimization: scale to zero when idle
MAX_NODES = 2 # Limit maximum nodes to control costs
# Check if compute target already exists
if COMPUTE_NAME in ws.compute_targets:
compute_target = ws.compute_targets[COMPUTE_NAME]
if compute_target and type(compute_target) is AmlCompute:
print(f"Found existing compute target: {COMPUTE_NAME}")
print(f"VM Size: {compute_target.vm_size}")
print(f"Current Nodes: {compute_target.get_status().current_node_count}")
else:
print(f"Compute target {COMPUTE_NAME} exists but is not AmlCompute type")
else:
print(f"Creating new compute target: {COMPUTE_NAME}")
print(f"VM Size: {VM_SIZE}, Min Nodes: {MIN_NODES}, Max Nodes: {MAX_NODES}")
# Create compute cluster configuration
provisioning_config = AmlCompute.provisioning_configuration(
vm_size=VM_SIZE,
min_nodes=MIN_NODES,
max_nodes=MAX_NODES,
idle_seconds_before_scaledown=300 # Scale down after 5 minutes of inactivity
)
# Create the compute target
compute_target = ComputeTarget.create(ws, COMPUTE_NAME, provisioning_config)
# Wait for completion with timeout
compute_target.wait_for_completion(
show_output=True,
min_node_count=None,
timeout_in_minutes=10
)
print("Compute cluster created successfully!")
Data Exploration with Azure Data Explorer
KqlMagic Configuration
KqlMagic enables seamless integration between Jupyter notebooks and Azure Data Explorer. The configuration below addresses common connection issues.
# Configure KqlMagic to prevent common connection errors
# These settings optimize the interaction with ADX
%config Kqlmagic.auto_dataframe = True
%config Kqlmagic.short_errors = True
%config Kqlmagic.display_limit = 50
# Set environment variable for Azure ML compatibility
import os
os.environ['KQLMAGIC_AZUREML_COMPUTE'] = 'ml.azure.com'
# Reload the extension to apply configuration
%reload_ext Kqlmagic
print("KqlMagic configured successfully")Connect to Azure Data Explorer
Important: Update the connection string below with your ADX cluster details:
- cluster: Your ADX cluster URI (e.g., 'yourcluster.region.kusto.windows.net')
- database: Your database name
# Update these values with your ADX cluster details
ADX_CLUSTER = "my-adx-cluster.australiasoutheast.kusto.windows.net"
ADX_DATABASE = "my-adx-database-security"
# Connect to Azure Data Explorer
# This will prompt for device authentication
%kql azureDataExplorer://code;cluster='{ADX_CLUSTER}';database='{ADX_DATABASE}'
print(f"Connected to ADX cluster: {ADX_CLUSTER}")
print(f"Database: {ADX_DATABASE}")# Query the occupancy detection dataset
# Store results in 'df' variable for further analysis
%kql df << OccupancyDetection
# Display dataset information
print(f"Dataset shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
# Show first few records
print("\nFirst 5 records:")
display(df.head())
# Basic data quality checks
print("\nData quality summary:")
print(f"Total records: {len(df)}")
print(f"Training records (Test=False): {len(df[df['Test'] == False])}")
print(f"Testing records (Test=True): {len(df[df['Test'] == True])}")
print(f"Occupied records: {len(df[df['Occupancy'] == True])}")
print(f"Unoccupied records: {len(df[df['Occupancy'] == False])}")
# Check for missing values
missing_values = df.isnull().sum()
if missing_values.sum() > 0:
print("\nMissing values detected:")
print(missing_values[missing_values > 0])
else:
print("\nNo missing values detected.")
print("\nData exploration complete.")Data Export to Azure Storage
Storage Configuration
To train models in Azure ML, we need to export data from ADX to Azure Blob Storage. This approach provides:
- Scalability: Handle large datasets efficiently
- Integration: Seamless connection between ADX and AML
- Versioning: Track different versions of training data
Storage Setup Instructions:
- Use the storage account automatically created with your AML workspace, or create a new one
- Create a blob container (e.g., 'ml-data')
- Generate a SAS token with read/write permissions
- You can use Azure Storage Explorer for easy management
# Azure Storage configuration
# Update these values with your storage account details
STORAGE_ACCOUNT = "your-storage-account-name"
CONTAINER_NAME = "your-container-name"
SAS_TOKEN = "?sp=your-token" # Your SAS token starting with ?
# Validate configuration
if STORAGE_ACCOUNT == "your-storage-account-name":
print("⚠️ Please update the storage configuration with your actual values")
print(" - STORAGE_ACCOUNT: Your Azure Storage account name")
print(" - CONTAINER_NAME: Your blob container name")
print(" - SAS_TOKEN: Your container SAS token with read/write permissions")
else:
print(f"✅ Storage configured: {STORAGE_ACCOUNT}/{CONTAINER_NAME}")
# Construct blob URI for ADX export
blob_container_uri = f"https://{STORAGE_ACCOUNT}.blob.core.windows.net/{CONTAINER_NAME}{SAS_TOKEN}"
print(f"Blob URI configured for data export")
# Export data from ADX to Azure Blob Storage
# This creates a CSV file that Azure ML can access
if 'blob_container_uri' not in locals():
print("❌ Please configure storage settings in the previous cell first")
else:
# Construct the ADX export command
export_query = f".export to csv (h@'{blob_container_uri}') with(includeHeaders=all) <| OccupancyDetection"
print("Exporting data from ADX to blob storage...")
print(f"Export command: {export_query[:100]}...")
# Execute the export
%kql export_result << -query export_query
# Validate export results
if not export_result.empty and 'Path' in export_result.columns:
# Extract the blob file name from the full path
blob_path = export_result['Path'].iloc[0]
data_blob_name = blob_path.split('/')[-1]
print(f"✅ Export successful!")
print(f" Records exported: {export_result['NumRecords'].iloc[0]}")
print(f" File size: {export_result['SizeInBytes'].iloc[0]} bytes")
print(f" Blob name: {data_blob_name}")
# Store for use in training script
EXPORTED_BLOB_NAME = data_blob_name
else:
print("❌ Export failed - check table data and storage permissions")
print(" Ensure the OccupancyDetection table contains data")
print(" Verify SAS token has write permissions")
Data Validation
Verify the exported data by downloading and inspecting it locally.
# Validate exported data by downloading and inspecting
import pandas as pd
from azure.storage.blob import BlobServiceClient
if 'EXPORTED_BLOB_NAME' not in locals():
print("❌ No exported blob found. Please run the export cell above first.")
else:
try:
# Create blob service client
blob_service_client = BlobServiceClient(
account_url=f"https://{STORAGE_ACCOUNT}.blob.core.windows.net",
credential=SAS_TOKEN.lstrip('?') # Remove the ? prefix
)
# Download blob to local file
blob_client = blob_service_client.get_blob_client(
container=CONTAINER_NAME,
blob=EXPORTED_BLOB_NAME
)
# Download and read the CSV
with open('validation_data.csv', 'wb') as download_file:
download_file.write(blob_client.download_blob().readall())
# Load and validate the data
validation_df = pd.read_csv('validation_data.csv')
print("✅ Data validation successful!")
print(f" Downloaded file shape: {validation_df.shape}")
print(f" Columns: {validation_df.columns.tolist()}")
# Display sample of exported data
print("\nSample of exported data:")
display(validation_df.tail(5))
print("Data export validation complete.")
except Exception as e:
print(f"❌ Validation failed: {e}")
print(" Check storage account configuration and permissions")Model Training Setup
Create Training Script Directory
Azure ML requires all training files to be in a directory that will be uploaded to the compute cluster.
# Create directory for training script and related files
import os
SCRIPT_FOLDER = os.path.join(os.getcwd(), "training_scripts")
os.makedirs(SCRIPT_FOLDER, exist_ok=True)
print(f"Training script directory created: {SCRIPT_FOLDER}")Training Script Development
The training script below implements:
- Data Loading: Downloads data from Azure Blob Storage
- Data Preprocessing: Handles train/test splits and data validation
- Model Training: Trains multiple classification algorithms
- Model Evaluation: Cross-validation with appropriate fold sizing
- Model Persistence: Saves trained models for deployment
- Experiment Tracking: Logs metrics to Azure ML
Algorithms Evaluated:
- Decision Tree Classifier
- Logistic Regression
- K-Nearest Neighbors
- Gaussian Naive Bayes
%%writefile "$SCRIPT_FOLDER/train.py"
"""
Azure ML Training Script for Room Occupancy Prediction
This script trains multiple classification models on environmental sensor data
to predict room occupancy. It includes comprehensive error handling,
data validation, and experiment tracking.
Authors: Adapted from Mohammad Ghodratigohar's original example
Date: June 2025
"""
import pickle
import argparse
import pandas as pd
import numpy as np
from azure.storage.blob import BlobServiceClient
# Scikit-learn imports
from sklearn import tree
from sklearn import neighbors
from sklearn import naive_bayes
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.metrics import classification_report, confusion_matrix
# Azure ML imports
from azureml.core import Run
import os
def parse_arguments():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description='Train occupancy prediction models')
parser.add_argument('--account', type=str, required=True,
help='Azure Storage account name')
parser.add_argument('--container', type=str, required=True,
help='Blob container name')
parser.add_argument('--blob', type=str, required=True,
help='Blob file name')
parser.add_argument('--sas', type=str, required=True,
help='SAS token for blob access')
return parser.parse_args()
def download_data(storage_account, container_name, blob_name, sas_token):
"""Download training data from Azure Blob Storage."""
try:
print(f"Downloading data from blob: {blob_name}")
# Create blob service client
blob_service_client = BlobServiceClient(
account_url=f"https://{storage_account}.blob.core.windows.net",
credential=sas_token.lstrip('?')
)
# Download blob
blob_client = blob_service_client.get_blob_client(
container=container_name,
blob=blob_name
)
with open('training_data.csv', 'wb') as download_file:
download_file.write(blob_client.download_blob().readall())
print("✅ Data download successful")
return True
except Exception as e:
print(f"❌ Data download failed: {e}")
return False
def load_and_validate_data():
"""Load CSV data and perform validation checks."""
try:
df = pd.read_csv('training_data.csv')
print(f"Dataset loaded: {df.shape}")
print(f"Columns: {df.columns.tolist()}")
# Validate required columns
required_columns = ['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio', 'Occupancy', 'Test']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")
# Check for missing values
if df.isnull().sum().sum() > 0:
print("⚠️ Missing values detected:")
print(df.isnull().sum())
# Display data distribution
print(f"\nData distribution:")
print(f" Total records: {len(df)}")
print(f" Occupied: {len(df[df['Occupancy'] == True])}")
print(f" Unoccupied: {len(df[df['Occupancy'] == False])}")
print(f" Training records: {len(df[df['Test'] == False])}")
print(f" Testing records: {len(df[df['Test'] == True])}")
return df
except Exception as e:
print(f"❌ Data validation failed: {e}")
return None
def prepare_features_and_targets(df):
"""Prepare feature matrices and target vectors for training."""
feature_columns = ['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio']
# Check if we have proper train/test split
train_available = (df['Test'] == False).any()
test_available = (df['Test'] == True).any()
if not train_available or not test_available:
print("⚠️ No proper train/test split found. Creating manual split (70/30)...")
n_train = int(0.7 * len(df))
train_data = df.iloc[:n_train].copy()
test_data = df.iloc[n_train:].copy()
else:
train_data = df[df['Test'] == False].copy()
test_data = df[df['Test'] == True].copy()
# Prepare training data
train_x = train_data[feature_columns]
train_y = train_data['Occupancy']
# Prepare testing data
test_x = test_data[feature_columns]
test_y = test_data['Occupancy']
print(f"\nData split summary:")
print(f" Training samples: {len(train_x)}")
print(f" Testing samples: {len(test_x)}")
return train_x, train_y, test_x, test_y
def create_models():
"""Create and configure classification models."""
models = {
'Decision Tree': tree.DecisionTreeClassifier(random_state=42),
'Logistic Regression': LogisticRegression(
solver='liblinear', max_iter=1000, random_state=42
),
'K Nearest Neighbors': neighbors.KNeighborsClassifier(n_neighbors=3),
'Naive Bayes': naive_bayes.GaussianNB()
}
return models
def train_and_evaluate_models(models, train_x, train_y, test_x, test_y, run):
"""Train models and evaluate performance."""
# Create outputs directory for model persistence
os.makedirs('outputs', exist_ok=True)
# Calculate appropriate CV folds based on dataset size
min_samples_per_class = min(train_y.sum(), len(train_y) - train_y.sum())
train_cv_folds = min(5, max(2, min_samples_per_class))
test_min_samples = min(test_y.sum(), len(test_y) - test_y.sum())
test_cv_folds = min(3, max(2, test_min_samples))
print(f"\nUsing {train_cv_folds} folds for training CV, {test_cv_folds} folds for testing CV")
model_results = {}
for model_name, model in models.items():
print(f"\n{'='*50}")
print(f"Training {model_name}...")
print(f"{'='*50}")
try:
# Train the model
model.fit(train_x, train_y)
# Training accuracy with cross-validation
if len(train_x) >= train_cv_folds:
train_scores = cross_val_score(
model, train_x, train_y,
cv=train_cv_folds, scoring='accuracy'
)
train_accuracy = train_scores.mean()
train_std = train_scores.std()
else:
train_accuracy = model.score(train_x, train_y)
train_std = 0.0
# Testing accuracy with cross-validation
if len(test_x) >= test_cv_folds:
test_scores = cross_val_score(
model, test_x, test_y,
cv=test_cv_folds, scoring='accuracy'
)
# Handle NaN values from failed CV folds (e.g., KNN with small datasets)
valid_scores = test_scores[~np.isnan(test_scores)]
if len(valid_scores) > 0:
test_accuracy = valid_scores.mean()
test_std = valid_scores.std()
else:
test_accuracy = model.score(test_x, test_y)
test_std = 0.0
else:
test_accuracy = model.score(test_x, test_y)
test_std = 0.0
# Log metrics to Azure ML
run.log(f'{model_name}_training_accuracy', train_accuracy)
run.log(f'{model_name}_testing_accuracy', test_accuracy)
run.log('model_type', model_name)
# Print results
print(f"Training Accuracy: {train_accuracy:.4f} (+/- {train_std:.4f})")
print(f"Testing Accuracy: {test_accuracy:.4f} (+/- {test_std:.4f})")
# Generate detailed classification report for test set
test_predictions = model.predict(test_x)
print(f"\nClassification Report:")
print(classification_report(test_y, test_predictions))
# Save model
model_filename = f'outputs/{model_name.replace(" ", "_")}.pkl'
with open(model_filename, 'wb') as f:
pickle.dump(model, f)
print(f"Model saved: {model_filename}")
# Store results
model_results[model_name] = {
'training_accuracy': train_accuracy,
'testing_accuracy': test_accuracy,
'model': model
}
except Exception as e:
print(f"❌ Error training {model_name}: {e}")
continue
return model_results
def main():
"""Main training function."""
print("🚀 Starting occupancy prediction model training...")
# Parse command line arguments
args = parse_arguments()
# Get Azure ML run context
run = Run.get_context()
# Download data
if not download_data(args.account, args.container, args.blob, args.sas):
return
# Load and validate data
df = load_and_validate_data()
if df is None:
return
# Prepare features and targets
train_x, train_y, test_x, test_y = prepare_features_and_targets(df)
# Create models
models = create_models()
# Train and evaluate models
results = train_and_evaluate_models(models, train_x, train_y, test_x, test_y, run)
# Summary
print(f"\n{'='*60}")
print("TRAINING SUMMARY")
print(f"{'='*60}")
if results:
best_model = max(results.items(), key=lambda x: x[1]['testing_accuracy'])
print("Model Performance Comparison:")
for name, metrics in results.items():
marker = "🏆" if name == best_model[0] else "📊"
print(f" {marker} {name}: {metrics['testing_accuracy']:.4f} (test accuracy)")
print(f"\n🎯 Best performing model: {best_model[0]} (Test Accuracy: {best_model[1]['testing_accuracy']:.4f})")
# Log best model info
run.log('best_model', best_model[0])
run.log('best_test_accuracy', best_model[1]['testing_accuracy'])
print("✅ Training completed successfully!")
if __name__ == '__main__':
main()Model Training Execution
Modern Training Approach with ScriptRunConfig
We use Azure ML's modern ScriptRunConfig approach instead of the deprecated estimator pattern. This provides:
- Flexibility: Full control over the training environment
- Consistency: Unified approach for all framework types
- Maintainability: Future-proof against SDK changes
- Transparency: Clear separation of environment and execution logic
# Validate that all required variables are available
EXPORTED_BLOB_NAME = data_blob_name
print("✅ Variables mapped successfully:")
print(f" Storage Account: {STORAGE_ACCOUNT}")
print(f" Container: {CONTAINER_NAME}")
print(f" Blob: {EXPORTED_BLOB_NAME}")
# Create ScriptRunConfig for modern, flexible training job submission
script_config = ScriptRunConfig(
source_directory=SCRIPT_FOLDER,
script='train.py',
environment=env,
compute_target=compute_target,
arguments=[
'--account', STORAGE_ACCOUNT,
'--container', CONTAINER_NAME,
'--blob', EXPORTED_BLOB_NAME,
'--sas', SAS_TOKEN
]
)
print("Training configuration created successfully")
print(f"Script directory: {SCRIPT_FOLDER}")
print(f"Compute target: {compute_target.name}")
print(f"Environment: {env.name}")Submit Training Job
Submit the training job to the compute cluster. The job will:
- Environment Setup (~2-5 minutes): Create or reuse Docker environment
- Data Download: Fetch training data from blob storage
- Model Training: Train all four classification models
- Evaluation: Perform cross-validation and generate metrics
- Model Persistence: Save trained models to outputs directory
# Submit the training job
if 'script_config' in locals():
print("🚀 Submitting training job to Azure ML...")
# Submit the run
training_run = exp.submit(config=script_config)
print(f"✅ Training job submitted successfully!")
print(f"Run ID: {training_run.id}")
print(f"Experiment: {exp.name}")
print(f"Status: {training_run.get_status()}")
# Display Azure ML Studio link
studio_url = training_run.get_portal_url()
print(f"\n🔗 Monitor progress in Azure ML Studio:")
print(f" {studio_url}")
# Store run reference for monitoring
current_run = training_run
else:
print("❌ Cannot submit job - training configuration not available")
print(" Please run the configuration cell above first")Monitor Training Progress
You can monitor the training job in multiple ways:
- Azure ML Studio: Use the link above for a rich web interface
- Notebook Widget: Execute the cell below for inline monitoring
- Programmatic Status: Check status and logs programmatically
Training Timeline:
- Image Creation (first run only): ~5 minutes
- Cluster Scaling: ~2-3 minutes if nodes need to be allocated
- Job Execution: ~2-5 minutes for this dataset size
- Post-processing: ~1 minute to collect outputs
# Monitor training job with real-time output
if 'current_run' in locals():
print(f"Monitoring training run: {current_run.id}")
print(f"Current status: {current_run.get_status()}")
# Wait for completion with live output streaming
# Set show_output=True to see real-time logs from the training script
try:
result = current_run.wait_for_completion(show_output=True)
print(f"\n{'='*60}")
print("TRAINING COMPLETED")
print(f"{'='*60}")
print(f"Final Status: {result['status']}")
print(f"Start Time: {result['startTimeUtc']}")
print(f"End Time: {result['endTimeUtc']}")
# Display key metrics
metrics = current_run.get_metrics()
if metrics:
print("\n📊 Training Metrics:")
for key, value in metrics.items():
if 'accuracy' in key.lower():
print(f" {key}: {value:.4f}")
# List output files
files = current_run.get_file_names()
output_files = [f for f in files if f.startswith('outputs/')]
if output_files:
print(f"\n💾 Generated Model Files:")
for file in output_files:
print(f" {file}")
except KeyboardInterrupt:
print("\n⏸️ Monitoring interrupted by user")
print(f" Training continues in background. Check status with: current_run.get_status()")
except Exception as e:
print(f"\n❌ Error during monitoring: {e}")
print(f" Check run status manually: {current_run.get_status()}")
else:
print("❌ No active training run found")
print(" Please submit a training job first")Retrieve Training Results
After training completes, we can analyze the results and download the best performing model.
# Analyze training results and download best model
if 'current_run' in locals() and current_run.get_status() == 'Completed':
print("📈 Analyzing training results...")
# Get all metrics
metrics = current_run.get_metrics()
# Extract model performance metrics
model_accuracies = {}
for key, value in metrics.items():
if '_testing_accuracy' in key:
model_name = key.replace('_testing_accuracy', '')
model_accuracies[model_name] = value
if model_accuracies:
print("\n🏆 Model Performance Summary:")
sorted_models = sorted(model_accuracies.items(), key=lambda x: x[1], reverse=True)
for i, (model, accuracy) in enumerate(sorted_models):
rank_emoji = "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else "📊"
print(f" {rank_emoji} {model}: {accuracy:.4f}")
# Download best model
best_model_name, best_accuracy = sorted_models[0]
model_filename = f"{best_model_name.replace(' ', '_')}.pkl"
print(f"\n💾 Downloading best model: {best_model_name}")
try:
# Download the best model file
current_run.download_file(f"outputs/{model_filename}", output_file_path=model_filename)
print(f" ✅ Model downloaded: {model_filename}")
print(f" 🎯 Best accuracy: {best_accuracy:.4f}")
except Exception as e:
print(f" ❌ Download failed: {e}")
else:
print("⚠️ No model accuracy metrics found")
# Display run summary
print(f"\n📋 Run Summary:")
print(f" Run ID: {current_run.id}")
print(f" Experiment: {current_run.experiment.name}")
print(f" Status: {current_run.get_status()}")
print(f" Portal URL: {current_run.get_portal_url()}")
elif 'current_run' in locals():
print(f"⏳ Training still in progress. Current status: {current_run.get_status()}")
print(" Run this cell again after training completes")
else:
print("❌ No training run available for analysis")
print(" Please submit and complete a training job first")Model Deployment and ADX Integration
Using Trained Models in Azure Data Explorer
Azure Data Explorer's Python plugin allows us to run trained models directly within KQL queries. This provides:
- Real-time Inference: Score data as it arrives in ADX
- Scalability: Leverage ADX's distributed architecture
- Integration: Keep ML workflows within your data platform
- Efficiency: Avoid data movement between systems
Deployment Options
There are several ways to deploy models for use with ADX:
- Inline Model: Embed the model directly in KQL queries (suitable for small models)
Prepare Model for ADX Deployment
We'll create a simple deployment package that includes the model and prediction logic.
# Missing ADX Model Storage Steps (Original Approach)
print("📦 STORING MODEL IN ADX TABLE")
print("="*60)
# Step 1: Get the downloaded model file
model_files = [f for f in os.listdir('.') if f.endswith('.pkl')]
if model_files:
model_path = model_files[0]
print(f"✅ Using model file: {model_path}")
# Step 2: Serialize model to hex string
import datetime
import pandas as pd
models_tbl = 'ML_Models'
model_name = 'AML-Occupancy'
print(f"\n📊 Converting model to hex string...")
with open(model_path, 'rb') as handle:
buf = handle.read()
# Convert binary to hex string (key step from original!)
smodel = buf.hex()
print(f" Model size: {len(buf)} bytes")
print(f" Hex string length: {len(smodel)} characters")
# Create DataFrame for ADX
now = datetime.datetime.now()
dfm = pd.DataFrame({
'name': [model_name],
'timestamp': [now],
'model': [smodel]
})
print(f"\n🏗️ Creating ADX table and storing model...")
# Create the ML_Models table
create_table_query = f'''
.create table {models_tbl} (
name: string,
timestamp: datetime,
model: string
)
'''
try:
%kql create_result << -query create_table_query
print("✅ ML_Models table created")
except:
print("✅ ML_Models table already exists")
# Store the model in the table
ingest_query = f'''
.ingest inline into table {models_tbl} <|
{model_name},{now.isoformat()},{smodel}
'''
try:
%kql ingest_result << -query ingest_query
print("✅ Model stored in ADX table!")
# Verify storage
verify_query = f'{models_tbl} | project name, timestamp, model_size = strlen(model)'
%kql verify_result << -query verify_query
if not verify_result.empty:
print("✅ Model verification successful!")
display(verify_result)
except Exception as e:
print(f"❌ Storage failed: {e}")
else:
print("❌ No model file found")Create ADX Scoring Queries
Create ready-to-use KQL queries that incorporate the trained model for real-time predictions.
# ADX Model Scoring Queries (using stored model from table)
print("🎯 CREATING ADX SCORING QUERIES")
print("="*60)
# Use the variable names from the previous step
MODEL_TABLE = 'ML_Models'
MODEL_NAME = 'AML-Occupancy'
if 'MODEL_TABLE' in locals() and 'MODEL_NAME' in locals():
print(f"✅ Using stored model:")
print(f" Table: {MODEL_TABLE}")
print(f" Model: {MODEL_NAME}")
# Create scoring query that loads model from ADX table
scoring_query = f'''
let model_data = toscalar(
{MODEL_TABLE}
| where name == "{MODEL_NAME}"
| top 1 by timestamp desc
| project model
);
OccupancyDetection
| take 5
| extend prediction = python(typeof(*, prediction:bool, confidence:real),
import pickle
import pandas as pd
import numpy as np
# Load model from hex string stored in ADX
model_hex = model_data
model_bytes = bytes.fromhex(model_hex)
model = pickle.loads(model_bytes)
# Make predictions
features = ['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio']
X = df[features]
predictions = model.predict(X)
df['prediction'] = predictions
# Add confidence if available
if hasattr(model, 'predict_proba'):
probabilities = model.predict_proba(X)
df['confidence'] = np.max(probabilities, axis=1)
else:
df['confidence'] = 1.0
result = df
)
| project Timestamp, Temperature, Humidity, Light, CO2, Occupancy, prediction, confidence
'''
print("\n📋 Scoring query created!")
print(" This query loads the model from ADX table and scores new data")
# Store for testing
SCORING_QUERY = scoring_query
else:
print("❌ Model storage variables not found")
print(" Please run the model storage step first")Testing the Scoring Query
# Correct ADX Scoring using the Original Approach (Stored Functions)
print("🎯 USING ORIGINAL ADX SCORING APPROACH")
print("="*60)
# The original approach uses 'evaluate python()' with stored functions
# This is different from the python() plugin we were trying to use
scoring_from_table_query = r'''
let classify_sf=(samples:(*), models_tbl:(name:string, timestamp:datetime, model:string), model_name:string, features_cols:dynamic, pred_col:string)
{
let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
let kwargs = pack('smodel', model_str, 'features_cols', features_cols, 'pred_col', pred_col);
let code =
'import pickle\n'
'import binascii\n'
'\n'
'smodel = kargs["smodel"]\n'
'features_cols = kargs["features_cols"]\n'
'pred_col = kargs["pred_col"]\n'
'bmodel = binascii.unhexlify(smodel)\n'
'clf1 = pickle.loads(bmodel)\n'
'df1 = df[features_cols]\n'
'predictions = clf1.predict(df1)\n'
'\n'
'result = df\n'
'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
'\n'
;
samples | evaluate python(typeof(*), code, kwargs)
};
OccupancyDetection
| where Test == 1
| extend pred_Occupancy=bool(0)
| invoke classify_sf(ML_Models, 'AML-Occupancy', pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
| summarize n=count() by Occupancy, pred_Occupancy
'''
print("📋 Original scoring query recreated!")
print(" Uses: evaluate python() instead of python() plugin")
print(" Uses: Stored functions with invoke")
print(" Uses: binascii.unhexlify() for hex decoding")
print("\n🧪 Testing the original approach...")
scoring_results = None
try:
%kql scoring_results << -query scoring_from_table_query
if scoring_results is not None and not scoring_results.empty:
print("✅ SUCCESS! Original approach works!")
print(f" Generated confusion matrix with {len(scoring_results)} rows")
print("\n📊 Confusion Matrix Results:")
display(scoring_results)
# Calculate accuracy from confusion matrix
if len(scoring_results) > 0:
total_predictions = scoring_results['n'].sum()
correct_predictions = scoring_results[scoring_results['Occupancy'] == scoring_results['pred_Occupancy']]['n'].sum()
accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0
print(f"\n🎯 Model Performance:")
print(f" Total predictions: {total_predictions}")
print(f" Correct predictions: {correct_predictions}")
print(f" Accuracy: {accuracy:.3f}")
print("\n🎉 COMPLETE SUCCESS!")
print(" ✅ Model training: Complete")
print(" ✅ Model storage in ADX: Complete")
print(" ✅ Model scoring in ADX: Complete")
print(" ✅ End-to-end pipeline: Working!")
else:
print("❌ Original approach returned no results")
except Exception as e:
print(f"❌ Original approach failed: {e}")
# Let's try a simpler version first
print("\n🔧 Trying simplified version...")
simple_scoring_query = r'''
let model_str = toscalar(ML_Models | where name == 'AML-Occupancy' | top 1 by timestamp desc | project model);
OccupancyDetection
| take 3
| extend model_available = isnotnull(model_str)
| project Timestamp, Temperature, Occupancy, model_available
'''
try:
%kql simple_scoring << -query simple_scoring_query
if 'simple_scoring' in locals() and not simple_scoring.empty:
print("✅ Model retrieval works!")
display(simple_scoring)
else:
print("❌ Model retrieval failed")
except Exception as simple_e:
print(f"❌ Simple test failed: {simple_e}")
print(f"\n{'='*60}")
print("KEY INSIGHT")
print("="*60)
print("The original approach uses:")
print(" • 'evaluate python()' - different from 'python()' plugin")
print(" • 'invoke' with stored functions")
print(" • 'binascii.unhexlify()' instead of 'bytes.fromhex()'")
print("\nThis may be supported even when python() plugin callouts aren't!")Summary and Next Steps
What We've Accomplished
This tutorial demonstrated a complete end-to-end ML workflow using Azure Machine Learning and Azure Data Explorer:
✅ Data Management: Stored and queried sensor data efficiently in ADX
✅ Model Training: Trained multiple classification models using Azure ML compute
✅ Model Evaluation: Compared performance across different algorithms
✅ Model Deployment: Deployed models for real-time inference in ADX
✅ Integration: Seamlessly connected ADX and Azure ML workflows
Key Takeaways
- Azure Data Explorer provides powerful capabilities for both data storage and ML inference
- Azure Machine Learning offers managed, scalable compute for model training
- Python Plugin in ADX enables sophisticated data processing and ML operations
- Modern SDK patterns like ScriptRunConfig provide flexibility and maintainability
- Log in to post comments