Running 'Machine Learning' off Azure Data Explorer

Example updated and adapted from Mohammad Ghodratigohar's 2022 YouTube tutorial for contemporary Azure Data Explorer (Updated July 2025).

This full notebook is available for download at:  

https://github.com/LaurieRhodes/PUBLIC-Scripts/tree/main/Jupyter_ADX

This demonstrates how to integrate Machine Learning into Azure Data Explorer data processing.

Overview

This notebook demonstrates the complete end-to-end machine learning workflow using Azure Machine Learning (AML) and Azure Data Explorer (ADX) for binary classification of room occupancy. The tutorial covers:

  • Data Storage: Utilizing ADX for efficient time-series data storage and querying
  • Model Training: Training multiple classification models using Azure ML compute clusters
  • Model Deployment: Deploying the trained model for inference using ADX's Python plugin
  • Integration: Seamless integration between ADX and AML for operationalized ML workflows

Business Context

Room occupancy prediction is a critical component in:

  • Smart Building Management: Optimizing HVAC systems for energy efficiency
  • Space Utilization: Understanding building usage patterns
  • Security Systems: Detecting unauthorized access or anomalous occupancy patterns
  • IoT Analytics: Processing sensor data at scale

Dataset Description

The occupancy detection dataset contains environmental sensor readings from an office space:

  • Temperature: Room temperature (°C)
  • Humidity: Relative humidity (%)
  • Light: Illumination levels (Lux)
  • CO2: Carbon dioxide concentration (ppm)
  • HumidityRatio: Derived humidity metric
  • Occupancy: Binary target variable (True/False)
  • Test: Train/test split indicator

 

Prerequisites and Setup

Azure Data Explorer Requirements

Before proceeding, ensure your ADX cluster is properly configured:

  1. Enable Python Plugin: The Python plugin must be enabled on your ADX cluster. This is required for model inference.
    • Navigate to your ADX cluster in the Azure portal
    • Go to Configuration → Python plugin
    • Enable the plugin (currently tested with Python Python 3.11.7 DL image)
    • Documentation: Python Plugin Guide
  2. Create Sample Dataset: Execute the following KQL commands in your ADX cluster:
// Create the OccupancyDetection table with proper schema
.create table OccupancyDetection (
    Timestamp: datetime,
    Temperature: real,
    Humidity: real,
    Light: real,
    CO2: real,
    HumidityRatio: real,
    Occupancy: bool,
    Test: bool
)

// Clear existing data and insert sample data with balanced train/test split
.clear table OccupancyDetection data

// Insert sample data with clear occupied vs unoccupied patterns
.ingest inline into table OccupancyDetection <|
2015-02-18T09:16:00.0000000Z,22.5,32.0,450.0,1800.0,0.0055,true,false
2015-02-18T09:17:00.0000000Z,23.0,33.5,480.0,1950.0,0.0058,true,false
2015-02-18T09:18:00.0000000Z,22.8,32.8,465.0,1875.0,0.0057,true,false
2015-02-18T09:19:00.0000000Z,23.2,34.0,490.0,2000.0,0.0059,true,false
2015-02-18T09:20:00.0000000Z,22.9,33.2,470.0,1920.0,0.0058,true,false
2015-02-18T10:16:00.0000000Z,18.5,22.0,0.0,350.0,0.0032,false,false
2015-02-18T10:17:00.0000000Z,18.2,21.5,0.0,340.0,0.0031,false,false
2015-02-18T10:18:00.0000000Z,18.0,21.0,0.0,330.0,0.0030,false,false
2015-02-18T10:19:00.0000000Z,18.3,21.8,0.0,345.0,0.0032,false,false
2015-02-18T10:20:00.0000000Z,18.1,21.3,0.0,335.0,0.0031,false,false
2015-02-18T11:16:00.0000000Z,22.7,32.5,455.0,1850.0,0.0056,true,true
2015-02-18T11:17:00.0000000Z,23.1,33.8,475.0,1900.0,0.0058,true,true
2015-02-18T11:18:00.0000000Z,22.6,32.2,460.0,1820.0,0.0055,true,true
2015-02-18T12:16:00.0000000Z,18.4,22.2,0.0,355.0,0.0033,false,true
2015-02-18T12:17:00.0000000Z,18.0,21.2,0.0,325.0,0.0030,false,true
2015-02-18T12:18:00.0000000Z,18.6,22.5,0.0,360.0,0.0034,false,true

Azure Machine Learning Requirements

  • An Azure ML workspace
  • A compute cluster for training (will be created if not exists)
  • Appropriate Azure Storage account with blob container
  • Required Python packages (installed below)

Environment Setup and Package Installation

Installing required packages with version pinning for reproducibility.

import sys
import azureml.core
from azureml.core import Workspace, Experiment, ScriptRunConfig
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.core.environment import Environment
from azureml.core.conda_dependencies import CondaDependencies

# Install KqlMagic with quiet output to avoid verbose logs
!pip install Kqlmagic --no-cache-dir --upgrade -q

# Display version information for troubleshooting
print(f"Python Version: {sys.version}")
print(f"Azure ML SDK Version: {azureml.core.VERSION}")
print("Environment setup complete.")

 

Azure ML Workspace Configuration

Workspace Creation

Choose one of the following approaches based on your scenario:

  1. Create New Workspace: Use the first cell if you need to create a new workspace
  2. Use Existing Workspace: Use the second cell if connecting to an existing workspace
# Option 1: Create a new Azure ML workspace
# Update these values with your Azure subscription details
SUBSCRIPTION_ID = "your-subscription-id"
RESOURCE_GROUP = "your-resource-group"
WORKSPACE_NAME = "your-workspace-name"
LOCATION = "your-azure-region"  # e.g., 'eastus', 'westeurope'

# Uncomment the following lines to create a new workspace
# ws = Workspace.create(
#     name=WORKSPACE_NAME,
#     subscription_id=SUBSCRIPTION_ID,
#     resource_group=RESOURCE_GROUP,
#     location=LOCATION,
#     exist_ok=True,
#     show_output=True
# )
# 
# # Save workspace configuration for future use
# ws.write_config()

print("Workspace creation cell - modify and uncomment as needed")

 

# Option 2: Connect to existing workspace using config.json
# This assumes you have a config.json file in the current directory
# or you've previously run ws.write_config()

try:
    ws = Workspace.from_config()
    print(f"Connected to workspace: {ws.name}")
    print(f"Location: {ws.location}")
    print(f"Resource Group: {ws.resource_group}")
    print(f"Subscription ID: {ws.subscription_id}")
except Exception as e:
    print(f"Error connecting to workspace: {e}")
    print("Please ensure config.json exists or create workspace using Option 1")

Experiment and Environment Setup

Create Experiment

Experiments in Azure ML provide organization and tracking for related runs. All training jobs will be tracked under this experiment.

# Create experiment for tracking all occupancy prediction runs
EXPERIMENT_NAME = "occupancy-prediction-adx"
exp = Experiment(workspace=ws, name=EXPERIMENT_NAME)

# Create custom environment with modern, stable package versions
# This replaces the legacy approach of hardcoded package versions
env = Environment(name="occupancy-detection-env-modern")

# Define conda dependencies with current best practices
conda_deps = CondaDependencies()

# Use Python 3.9 for broad compatibility and stability
conda_deps.add_conda_package("python=3.9")

# Core data science packages with minimum versions for compatibility
conda_deps.add_conda_package("pandas>=1.3.0")
conda_deps.add_conda_package("scikit-learn>=1.0.0")
conda_deps.add_conda_package("numpy>=1.21.0")

# Azure ML specific packages
conda_deps.add_pip_package("azureml-defaults")
conda_deps.add_pip_package("azure-storage-blob>=12.0.0")

# Assign dependencies to environment
env.python.conda_dependencies = conda_deps

print(f"Experiment '{EXPERIMENT_NAME}' created successfully")
print("Custom environment configured with modern package versions")


Compute Infrastructure Setup

Create or Connect to Compute Cluster

Azure ML Compute provides managed, scalable compute for training. The cluster will:

  • Auto-scale: Start with minimum nodes and scale up as needed
  • Cost-efficient: Scale down to minimum nodes when idle
  • Managed: Azure handles OS updates, driver installation, etc.

Note: Initial cluster creation takes approximately 5 minutes. Subsequent runs reuse the existing cluster.

# Compute cluster configuration
COMPUTE_NAME = "cpu-cluster"
VM_SIZE = "STANDARD_D2_V2"  # 2 cores, 7GB RAM - suitable for small ML workloads
MIN_NODES = 0  # Cost optimization: scale to zero when idle
MAX_NODES = 2  # Limit maximum nodes to control costs

# Check if compute target already exists
if COMPUTE_NAME in ws.compute_targets:
    compute_target = ws.compute_targets[COMPUTE_NAME]
    if compute_target and type(compute_target) is AmlCompute:
        print(f"Found existing compute target: {COMPUTE_NAME}")
        print(f"VM Size: {compute_target.vm_size}")
        print(f"Current Nodes: {compute_target.get_status().current_node_count}")
    else:
        print(f"Compute target {COMPUTE_NAME} exists but is not AmlCompute type")
else:
    print(f"Creating new compute target: {COMPUTE_NAME}")
    print(f"VM Size: {VM_SIZE}, Min Nodes: {MIN_NODES}, Max Nodes: {MAX_NODES}")
    
    # Create compute cluster configuration
    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size=VM_SIZE,
        min_nodes=MIN_NODES,
        max_nodes=MAX_NODES,
        idle_seconds_before_scaledown=300  # Scale down after 5 minutes of inactivity
    )
    
    # Create the compute target
    compute_target = ComputeTarget.create(ws, COMPUTE_NAME, provisioning_config)
    
    # Wait for completion with timeout
    compute_target.wait_for_completion(
        show_output=True,
        min_node_count=None,
        timeout_in_minutes=10
    )
    
    print("Compute cluster created successfully!")

 

Data Exploration with Azure Data Explorer

KqlMagic Configuration

KqlMagic enables seamless integration between Jupyter notebooks and Azure Data Explorer. The configuration below addresses common connection issues.

# Configure KqlMagic to prevent common connection errors
# These settings optimize the interaction with ADX
%config Kqlmagic.auto_dataframe = True
%config Kqlmagic.short_errors = True
%config Kqlmagic.display_limit = 50

# Set environment variable for Azure ML compatibility
import os
os.environ['KQLMAGIC_AZUREML_COMPUTE'] = 'ml.azure.com'

# Reload the extension to apply configuration
%reload_ext Kqlmagic

print("KqlMagic configured successfully")

Connect to Azure Data Explorer

Important: Update the connection string below with your ADX cluster details:

# Update these values with your ADX cluster details
ADX_CLUSTER = "my-adx-cluster.australiasoutheast.kusto.windows.net"
ADX_DATABASE = "my-adx-database-security"

# Connect to Azure Data Explorer
# This will prompt for device authentication
%kql azureDataExplorer://code;cluster='{ADX_CLUSTER}';database='{ADX_DATABASE}'

print(f"Connected to ADX cluster: {ADX_CLUSTER}")
print(f"Database: {ADX_DATABASE}")
# Query the occupancy detection dataset
# Store results in 'df' variable for further analysis
%kql df << OccupancyDetection

# Display dataset information
print(f"Dataset shape: {df.shape}")
print(f"Columns: {df.columns.tolist()}")

# Show first few records
print("\nFirst 5 records:")
display(df.head())

# Basic data quality checks
print("\nData quality summary:")
print(f"Total records: {len(df)}")
print(f"Training records (Test=False): {len(df[df['Test'] == False])}")
print(f"Testing records (Test=True): {len(df[df['Test'] == True])}")
print(f"Occupied records: {len(df[df['Occupancy'] == True])}")
print(f"Unoccupied records: {len(df[df['Occupancy'] == False])}")

# Check for missing values
missing_values = df.isnull().sum()
if missing_values.sum() > 0:
    print("\nMissing values detected:")
    print(missing_values[missing_values > 0])
else:
    print("\nNo missing values detected.")

print("\nData exploration complete.")

Data Export to Azure Storage

Storage Configuration

To train models in Azure ML, we need to export data from ADX to Azure Blob Storage. This approach provides:

  • Scalability: Handle large datasets efficiently
  • Integration: Seamless connection between ADX and AML
  • Versioning: Track different versions of training data

Storage Setup Instructions:

  1. Use the storage account automatically created with your AML workspace, or create a new one
  2. Create a blob container (e.g., 'ml-data')
  3. Generate a SAS token with read/write permissions
  4. You can use Azure Storage Explorer for easy management
# Azure Storage configuration
# Update these values with your storage account details
STORAGE_ACCOUNT = "your-storage-account-name"
CONTAINER_NAME = "your-container-name"
SAS_TOKEN = "?sp=your-token"  # Your SAS token starting with ?

# Validate configuration
if STORAGE_ACCOUNT == "your-storage-account-name":
    print("⚠️  Please update the storage configuration with your actual values")
    print("   - STORAGE_ACCOUNT: Your Azure Storage account name")
    print("   - CONTAINER_NAME: Your blob container name")
    print("   - SAS_TOKEN: Your container SAS token with read/write permissions")
else:
    print(f"✅ Storage configured: {STORAGE_ACCOUNT}/{CONTAINER_NAME}")
    
    # Construct blob URI for ADX export
    blob_container_uri = f"https://{STORAGE_ACCOUNT}.blob.core.windows.net/{CONTAINER_NAME}{SAS_TOKEN}"
    print(f"Blob URI configured for data export")

 

# Export data from ADX to Azure Blob Storage
# This creates a CSV file that Azure ML can access

if 'blob_container_uri' not in locals():
    print("❌ Please configure storage settings in the previous cell first")
else:
    # Construct the ADX export command
    export_query = f".export to csv (h@'{blob_container_uri}') with(includeHeaders=all) <| OccupancyDetection"
    
    print("Exporting data from ADX to blob storage...")
    print(f"Export command: {export_query[:100]}...")
    
    # Execute the export
    %kql export_result << -query export_query
    
    # Validate export results
    if not export_result.empty and 'Path' in export_result.columns:
        # Extract the blob file name from the full path
        blob_path = export_result['Path'].iloc[0]
        data_blob_name = blob_path.split('/')[-1]
        
        print(f"✅ Export successful!")
        print(f"   Records exported: {export_result['NumRecords'].iloc[0]}")
        print(f"   File size: {export_result['SizeInBytes'].iloc[0]} bytes")
        print(f"   Blob name: {data_blob_name}")
        
        # Store for use in training script
        EXPORTED_BLOB_NAME = data_blob_name
    else:
        print("❌ Export failed - check table data and storage permissions")
        print("   Ensure the OccupancyDetection table contains data")
        print("   Verify SAS token has write permissions")

 

Data Validation

Verify the exported data by downloading and inspecting it locally.

# Validate exported data by downloading and inspecting
import pandas as pd
from azure.storage.blob import BlobServiceClient

if 'EXPORTED_BLOB_NAME' not in locals():
    print("❌ No exported blob found. Please run the export cell above first.")
else:
    try:
        # Create blob service client
        blob_service_client = BlobServiceClient(
            account_url=f"https://{STORAGE_ACCOUNT}.blob.core.windows.net",
            credential=SAS_TOKEN.lstrip('?')  # Remove the ? prefix
        )
        
        # Download blob to local file
        blob_client = blob_service_client.get_blob_client(
            container=CONTAINER_NAME,
            blob=EXPORTED_BLOB_NAME
        )
        
        # Download and read the CSV
        with open('validation_data.csv', 'wb') as download_file:
            download_file.write(blob_client.download_blob().readall())
        
        # Load and validate the data
        validation_df = pd.read_csv('validation_data.csv')
        
        print("✅ Data validation successful!")
        print(f"   Downloaded file shape: {validation_df.shape}")
        print(f"   Columns: {validation_df.columns.tolist()}")
        
        # Display sample of exported data
        print("\nSample of exported data:")
        display(validation_df.tail(5))
        
        print("Data export validation complete.")
        
    except Exception as e:
        print(f"❌ Validation failed: {e}")
        print("   Check storage account configuration and permissions")

Model Training Setup

Create Training Script Directory

Azure ML requires all training files to be in a directory that will be uploaded to the compute cluster.

# Create directory for training script and related files
import os

SCRIPT_FOLDER = os.path.join(os.getcwd(), "training_scripts")
os.makedirs(SCRIPT_FOLDER, exist_ok=True)

print(f"Training script directory created: {SCRIPT_FOLDER}")

Training Script Development

The training script below implements:

  • Data Loading: Downloads data from Azure Blob Storage
  • Data Preprocessing: Handles train/test splits and data validation
  • Model Training: Trains multiple classification algorithms
  • Model Evaluation: Cross-validation with appropriate fold sizing
  • Model Persistence: Saves trained models for deployment
  • Experiment Tracking: Logs metrics to Azure ML

Algorithms Evaluated:

  • Decision Tree Classifier
  • Logistic Regression
  • K-Nearest Neighbors
  • Gaussian Naive Bayes
%%writefile "$SCRIPT_FOLDER/train.py"

"""
Azure ML Training Script for Room Occupancy Prediction

This script trains multiple classification models on environmental sensor data
to predict room occupancy. It includes comprehensive error handling, 
data validation, and experiment tracking.

Authors: Adapted from Mohammad Ghodratigohar's original example
Date: June 2025
"""

import pickle
import argparse
import pandas as pd
import numpy as np
from azure.storage.blob import BlobServiceClient

# Scikit-learn imports
from sklearn import tree
from sklearn import neighbors
from sklearn import naive_bayes
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.metrics import classification_report, confusion_matrix

# Azure ML imports
from azureml.core import Run
import os

def parse_arguments():
    """Parse command line arguments."""
    parser = argparse.ArgumentParser(description='Train occupancy prediction models')
    parser.add_argument('--account', type=str, required=True, 
                       help='Azure Storage account name')
    parser.add_argument('--container', type=str, required=True,
                       help='Blob container name')
    parser.add_argument('--blob', type=str, required=True,
                       help='Blob file name')
    parser.add_argument('--sas', type=str, required=True,
                       help='SAS token for blob access')
    return parser.parse_args()

def download_data(storage_account, container_name, blob_name, sas_token):
    """Download training data from Azure Blob Storage."""
    try:
        print(f"Downloading data from blob: {blob_name}")
        
        # Create blob service client
        blob_service_client = BlobServiceClient(
            account_url=f"https://{storage_account}.blob.core.windows.net",
            credential=sas_token.lstrip('?')
        )
        
        # Download blob
        blob_client = blob_service_client.get_blob_client(
            container=container_name,
            blob=blob_name
        )
        
        with open('training_data.csv', 'wb') as download_file:
            download_file.write(blob_client.download_blob().readall())
        
        print("✅ Data download successful")
        return True
        
    except Exception as e:
        print(f"❌ Data download failed: {e}")
        return False

def load_and_validate_data():
    """Load CSV data and perform validation checks."""
    try:
        df = pd.read_csv('training_data.csv')
        
        print(f"Dataset loaded: {df.shape}")
        print(f"Columns: {df.columns.tolist()}")
        
        # Validate required columns
        required_columns = ['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio', 'Occupancy', 'Test']
        missing_columns = [col for col in required_columns if col not in df.columns]
        
        if missing_columns:
            raise ValueError(f"Missing required columns: {missing_columns}")
        
        # Check for missing values
        if df.isnull().sum().sum() > 0:
            print("⚠️  Missing values detected:")
            print(df.isnull().sum())
        
        # Display data distribution
        print(f"\nData distribution:")
        print(f"  Total records: {len(df)}")
        print(f"  Occupied: {len(df[df['Occupancy'] == True])}")
        print(f"  Unoccupied: {len(df[df['Occupancy'] == False])}")
        print(f"  Training records: {len(df[df['Test'] == False])}")
        print(f"  Testing records: {len(df[df['Test'] == True])}")
        
        return df
        
    except Exception as e:
        print(f"❌ Data validation failed: {e}")
        return None

def prepare_features_and_targets(df):
    """Prepare feature matrices and target vectors for training."""
    feature_columns = ['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio']
    
    # Check if we have proper train/test split
    train_available = (df['Test'] == False).any()
    test_available = (df['Test'] == True).any()
    
    if not train_available or not test_available:
        print("⚠️  No proper train/test split found. Creating manual split (70/30)...")
        n_train = int(0.7 * len(df))
        train_data = df.iloc[:n_train].copy()
        test_data = df.iloc[n_train:].copy()
    else:
        train_data = df[df['Test'] == False].copy()
        test_data = df[df['Test'] == True].copy()
    
    # Prepare training data
    train_x = train_data[feature_columns]
    train_y = train_data['Occupancy']
    
    # Prepare testing data
    test_x = test_data[feature_columns]
    test_y = test_data['Occupancy']
    
    print(f"\nData split summary:")
    print(f"  Training samples: {len(train_x)}")
    print(f"  Testing samples: {len(test_x)}")
    
    return train_x, train_y, test_x, test_y

def create_models():
    """Create and configure classification models."""
    models = {
        'Decision Tree': tree.DecisionTreeClassifier(random_state=42),
        'Logistic Regression': LogisticRegression(
            solver='liblinear', max_iter=1000, random_state=42
        ),
        'K Nearest Neighbors': neighbors.KNeighborsClassifier(n_neighbors=3),
        'Naive Bayes': naive_bayes.GaussianNB()
    }
    return models

def train_and_evaluate_models(models, train_x, train_y, test_x, test_y, run):
    """Train models and evaluate performance."""
    
    # Create outputs directory for model persistence
    os.makedirs('outputs', exist_ok=True)
    
    # Calculate appropriate CV folds based on dataset size
    min_samples_per_class = min(train_y.sum(), len(train_y) - train_y.sum())
    train_cv_folds = min(5, max(2, min_samples_per_class))
    
    test_min_samples = min(test_y.sum(), len(test_y) - test_y.sum())
    test_cv_folds = min(3, max(2, test_min_samples))
    
    print(f"\nUsing {train_cv_folds} folds for training CV, {test_cv_folds} folds for testing CV")
    
    model_results = {}
    
    for model_name, model in models.items():
        print(f"\n{'='*50}")
        print(f"Training {model_name}...")
        print(f"{'='*50}")
        
        try:
            # Train the model
            model.fit(train_x, train_y)
            
            # Training accuracy with cross-validation
            if len(train_x) >= train_cv_folds:
                train_scores = cross_val_score(
                    model, train_x, train_y, 
                    cv=train_cv_folds, scoring='accuracy'
                )
                train_accuracy = train_scores.mean()
                train_std = train_scores.std()
            else:
                train_accuracy = model.score(train_x, train_y)
                train_std = 0.0
            
            # Testing accuracy with cross-validation
            if len(test_x) >= test_cv_folds:
                test_scores = cross_val_score(
                    model, test_x, test_y,
                    cv=test_cv_folds, scoring='accuracy'
                )
                # Handle NaN values from failed CV folds (e.g., KNN with small datasets)
                valid_scores = test_scores[~np.isnan(test_scores)]
                if len(valid_scores) > 0:
                    test_accuracy = valid_scores.mean()
                    test_std = valid_scores.std()
                else:
                    test_accuracy = model.score(test_x, test_y)
                    test_std = 0.0
            else:
                test_accuracy = model.score(test_x, test_y)
                test_std = 0.0
            
            # Log metrics to Azure ML
            run.log(f'{model_name}_training_accuracy', train_accuracy)
            run.log(f'{model_name}_testing_accuracy', test_accuracy)
            run.log('model_type', model_name)
            
            # Print results
            print(f"Training Accuracy: {train_accuracy:.4f} (+/- {train_std:.4f})")
            print(f"Testing Accuracy:  {test_accuracy:.4f} (+/- {test_std:.4f})")
            
            # Generate detailed classification report for test set
            test_predictions = model.predict(test_x)
            print(f"\nClassification Report:")
            print(classification_report(test_y, test_predictions))
            
            # Save model
            model_filename = f'outputs/{model_name.replace(" ", "_")}.pkl'
            with open(model_filename, 'wb') as f:
                pickle.dump(model, f)
            
            print(f"Model saved: {model_filename}")
            
            # Store results
            model_results[model_name] = {
                'training_accuracy': train_accuracy,
                'testing_accuracy': test_accuracy,
                'model': model
            }
            
        except Exception as e:
            print(f"❌ Error training {model_name}: {e}")
            continue
    
    return model_results

def main():
    """Main training function."""
    print("🚀 Starting occupancy prediction model training...")
    
    # Parse command line arguments
    args = parse_arguments()
    
    # Get Azure ML run context
    run = Run.get_context()
    
    # Download data
    if not download_data(args.account, args.container, args.blob, args.sas):
        return
    
    # Load and validate data
    df = load_and_validate_data()
    if df is None:
        return
    
    # Prepare features and targets
    train_x, train_y, test_x, test_y = prepare_features_and_targets(df)
    
    # Create models
    models = create_models()
    
    # Train and evaluate models
    results = train_and_evaluate_models(models, train_x, train_y, test_x, test_y, run)
    
    # Summary
    print(f"\n{'='*60}")
    print("TRAINING SUMMARY")
    print(f"{'='*60}")
    
    if results:
        best_model = max(results.items(), key=lambda x: x[1]['testing_accuracy'])
        
        print("Model Performance Comparison:")
        for name, metrics in results.items():
            marker = "🏆" if name == best_model[0] else "📊"
            print(f"  {marker} {name}: {metrics['testing_accuracy']:.4f} (test accuracy)")
        
        print(f"\n🎯 Best performing model: {best_model[0]} (Test Accuracy: {best_model[1]['testing_accuracy']:.4f})")
        
        # Log best model info
        run.log('best_model', best_model[0])
        run.log('best_test_accuracy', best_model[1]['testing_accuracy'])
    
    print("✅ Training completed successfully!")

if __name__ == '__main__':
    main()

Model Training Execution

Modern Training Approach with ScriptRunConfig

We use Azure ML's modern ScriptRunConfig approach instead of the deprecated estimator pattern. This provides:

  • Flexibility: Full control over the training environment
  • Consistency: Unified approach for all framework types
  • Maintainability: Future-proof against SDK changes
  • Transparency: Clear separation of environment and execution logic
# Validate that all required variables are available
EXPORTED_BLOB_NAME = data_blob_name

print("✅ Variables mapped successfully:")
print(f"   Storage Account: {STORAGE_ACCOUNT}")
print(f"   Container: {CONTAINER_NAME}")
print(f"   Blob: {EXPORTED_BLOB_NAME}")

# Create ScriptRunConfig for modern, flexible training job submission
script_config = ScriptRunConfig(
    source_directory=SCRIPT_FOLDER,
    script='train.py',
    environment=env,
    compute_target=compute_target,
    arguments=[
        '--account', STORAGE_ACCOUNT,
        '--container', CONTAINER_NAME,
        '--blob', EXPORTED_BLOB_NAME,
        '--sas', SAS_TOKEN
    ]
)
    
print("Training configuration created successfully")
print(f"Script directory: {SCRIPT_FOLDER}")
print(f"Compute target: {compute_target.name}")
print(f"Environment: {env.name}")

Submit Training Job

Submit the training job to the compute cluster. The job will:

  1. Environment Setup (~2-5 minutes): Create or reuse Docker environment
  2. Data Download: Fetch training data from blob storage
  3. Model Training: Train all four classification models
  4. Evaluation: Perform cross-validation and generate metrics
  5. Model Persistence: Save trained models to outputs directory
# Submit the training job
if 'script_config' in locals():
    print("🚀 Submitting training job to Azure ML...")
    
    # Submit the run
    training_run = exp.submit(config=script_config)
    
    print(f"✅ Training job submitted successfully!")
    print(f"Run ID: {training_run.id}")
    print(f"Experiment: {exp.name}")
    print(f"Status: {training_run.get_status()}")
    
    # Display Azure ML Studio link
    studio_url = training_run.get_portal_url()
    print(f"\n🔗 Monitor progress in Azure ML Studio:")
    print(f"   {studio_url}")
    
    # Store run reference for monitoring
    current_run = training_run
    
else:
    print("❌ Cannot submit job - training configuration not available")
    print("   Please run the configuration cell above first")

Monitor Training Progress

You can monitor the training job in multiple ways:

  1. Azure ML Studio: Use the link above for a rich web interface
  2. Notebook Widget: Execute the cell below for inline monitoring
  3. Programmatic Status: Check status and logs programmatically

Training Timeline:

  • Image Creation (first run only): ~5 minutes
  • Cluster Scaling: ~2-3 minutes if nodes need to be allocated
  • Job Execution: ~2-5 minutes for this dataset size
  • Post-processing: ~1 minute to collect outputs
# Monitor training job with real-time output
if 'current_run' in locals():
    print(f"Monitoring training run: {current_run.id}")
    print(f"Current status: {current_run.get_status()}")
    
    # Wait for completion with live output streaming
    # Set show_output=True to see real-time logs from the training script
    try:
        result = current_run.wait_for_completion(show_output=True)
        
        print(f"\n{'='*60}")
        print("TRAINING COMPLETED")
        print(f"{'='*60}")
        
        print(f"Final Status: {result['status']}")
        print(f"Start Time: {result['startTimeUtc']}")
        print(f"End Time: {result['endTimeUtc']}")
        
        # Display key metrics
        metrics = current_run.get_metrics()
        if metrics:
            print("\n📊 Training Metrics:")
            for key, value in metrics.items():
                if 'accuracy' in key.lower():
                    print(f"  {key}: {value:.4f}")
        
        # List output files
        files = current_run.get_file_names()
        output_files = [f for f in files if f.startswith('outputs/')]
        if output_files:
            print(f"\n💾 Generated Model Files:")
            for file in output_files:
                print(f"  {file}")
        
    except KeyboardInterrupt:
        print("\n⏸️  Monitoring interrupted by user")
        print(f"   Training continues in background. Check status with: current_run.get_status()")
        
    except Exception as e:
        print(f"\n❌ Error during monitoring: {e}")
        print(f"   Check run status manually: {current_run.get_status()}")

else:
    print("❌ No active training run found")
    print("   Please submit a training job first")

Retrieve Training Results

After training completes, we can analyze the results and download the best performing model.
 

# Analyze training results and download best model
if 'current_run' in locals() and current_run.get_status() == 'Completed':
    
    print("📈 Analyzing training results...")
    
    # Get all metrics
    metrics = current_run.get_metrics()
    
    # Extract model performance metrics
    model_accuracies = {}
    for key, value in metrics.items():
        if '_testing_accuracy' in key:
            model_name = key.replace('_testing_accuracy', '')
            model_accuracies[model_name] = value
    
    if model_accuracies:
        print("\n🏆 Model Performance Summary:")
        sorted_models = sorted(model_accuracies.items(), key=lambda x: x[1], reverse=True)
        
        for i, (model, accuracy) in enumerate(sorted_models):
            rank_emoji = "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else "📊"
            print(f"  {rank_emoji} {model}: {accuracy:.4f}")
        
        # Download best model
        best_model_name, best_accuracy = sorted_models[0]
        model_filename = f"{best_model_name.replace(' ', '_')}.pkl"
        
        print(f"\n💾 Downloading best model: {best_model_name}")
        try:
            # Download the best model file
            current_run.download_file(f"outputs/{model_filename}", output_file_path=model_filename)
            print(f"   ✅ Model downloaded: {model_filename}")
            print(f"   🎯 Best accuracy: {best_accuracy:.4f}")
            
        except Exception as e:
            print(f"   ❌ Download failed: {e}")
    
    else:
        print("⚠️  No model accuracy metrics found")
    
    # Display run summary
    print(f"\n📋 Run Summary:")
    print(f"   Run ID: {current_run.id}")
    print(f"   Experiment: {current_run.experiment.name}")
    print(f"   Status: {current_run.get_status()}")
    print(f"   Portal URL: {current_run.get_portal_url()}")

elif 'current_run' in locals():
    print(f"⏳ Training still in progress. Current status: {current_run.get_status()}")
    print("   Run this cell again after training completes")

else:
    print("❌ No training run available for analysis")
    print("   Please submit and complete a training job first")

Model Deployment and ADX Integration

Using Trained Models in Azure Data Explorer

Azure Data Explorer's Python plugin allows us to run trained models directly within KQL queries. This provides:

  • Real-time Inference: Score data as it arrives in ADX
  • Scalability: Leverage ADX's distributed architecture
  • Integration: Keep ML workflows within your data platform
  • Efficiency: Avoid data movement between systems

Deployment Options

There are several ways to deploy models for use with ADX:

  1. Inline Model: Embed the model directly in KQL queries (suitable for small models)

 

Prepare Model for ADX Deployment

We'll create a simple deployment package that includes the model and prediction logic.

# Missing ADX Model Storage Steps (Original Approach)

print("📦 STORING MODEL IN ADX TABLE")
print("="*60)

# Step 1: Get the downloaded model file
model_files = [f for f in os.listdir('.') if f.endswith('.pkl')]

if model_files:
    model_path = model_files[0]
    print(f"✅ Using model file: {model_path}")
    
    # Step 2: Serialize model to hex string
    import datetime
    import pandas as pd
    
    models_tbl = 'ML_Models'
    model_name = 'AML-Occupancy'
    
    print(f"\n📊 Converting model to hex string...")
    
    with open(model_path, 'rb') as handle:
        buf = handle.read()
    
    # Convert binary to hex string (key step from original!)
    smodel = buf.hex()
    
    print(f"   Model size: {len(buf)} bytes")
    print(f"   Hex string length: {len(smodel)} characters")
    
    # Create DataFrame for ADX
    now = datetime.datetime.now()
    dfm = pd.DataFrame({
        'name': [model_name], 
        'timestamp': [now], 
        'model': [smodel]
    })
    
    print(f"\n🏗️ Creating ADX table and storing model...")
    
    # Create the ML_Models table
    create_table_query = f'''
.create table {models_tbl} (
    name: string,
    timestamp: datetime,
    model: string
)
'''
    
    try:
        %kql create_result << -query create_table_query
        print("✅ ML_Models table created")
    except:
        print("✅ ML_Models table already exists")
    
    # Store the model in the table
    ingest_query = f'''
.ingest inline into table {models_tbl} <|
{model_name},{now.isoformat()},{smodel}
'''
    
    try:
        %kql ingest_result << -query ingest_query
        print("✅ Model stored in ADX table!")
        
        # Verify storage
        verify_query = f'{models_tbl} | project name, timestamp, model_size = strlen(model)'
        %kql verify_result << -query verify_query
        
        if not verify_result.empty:
            print("✅ Model verification successful!")
            display(verify_result)
            
    except Exception as e:
        print(f"❌ Storage failed: {e}")

else:
    print("❌ No model file found")

Create ADX Scoring Queries

Create ready-to-use KQL queries that incorporate the trained model for real-time predictions.

# ADX Model Scoring Queries (using stored model from table)

print("🎯 CREATING ADX SCORING QUERIES")
print("="*60)


# Use the variable names from the previous step
MODEL_TABLE = 'ML_Models'
MODEL_NAME = 'AML-Occupancy'

if 'MODEL_TABLE' in locals() and 'MODEL_NAME' in locals():
    
    print(f"✅ Using stored model:")
    print(f"   Table: {MODEL_TABLE}")
    print(f"   Model: {MODEL_NAME}")
    
    # Create scoring query that loads model from ADX table
    scoring_query = f'''
let model_data = toscalar(
    {MODEL_TABLE}
    | where name == "{MODEL_NAME}"
    | top 1 by timestamp desc
    | project model
);
OccupancyDetection
| take 5
| extend prediction = python(typeof(*, prediction:bool, confidence:real),
import pickle
import pandas as pd
import numpy as np
# Load model from hex string stored in ADX
model_hex = model_data
model_bytes = bytes.fromhex(model_hex)
model = pickle.loads(model_bytes)
# Make predictions
features = ['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio']
X = df[features]
predictions = model.predict(X)
df['prediction'] = predictions
# Add confidence if available
if hasattr(model, 'predict_proba'):
probabilities = model.predict_proba(X)
df['confidence'] = np.max(probabilities, axis=1)
else:
df['confidence'] = 1.0
result = df
)
| project Timestamp, Temperature, Humidity, Light, CO2, Occupancy, prediction, confidence
'''
    
    print("\n📋 Scoring query created!")
    print("   This query loads the model from ADX table and scores new data")
    
    # Store for testing
    SCORING_QUERY = scoring_query
    
else:
    print("❌ Model storage variables not found")
    print("   Please run the model storage step first")

Testing the Scoring Query

# Correct ADX Scoring using the Original Approach (Stored Functions)

print("🎯 USING ORIGINAL ADX SCORING APPROACH")
print("="*60)

# The original approach uses 'evaluate python()' with stored functions
# This is different from the python() plugin we were trying to use

scoring_from_table_query = r'''
let classify_sf=(samples:(*), models_tbl:(name:string, timestamp:datetime, model:string), model_name:string, features_cols:dynamic, pred_col:string)
{
    let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
    let kwargs = pack('smodel', model_str, 'features_cols', features_cols, 'pred_col', pred_col);
    let code =
    'import pickle\n'
    'import binascii\n'
    '\n'
    'smodel = kargs["smodel"]\n'
    'features_cols = kargs["features_cols"]\n'
    'pred_col = kargs["pred_col"]\n'
    'bmodel = binascii.unhexlify(smodel)\n'
    'clf1 = pickle.loads(bmodel)\n'
    'df1 = df[features_cols]\n'
    'predictions = clf1.predict(df1)\n'
    '\n'
    'result = df\n'
    'result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])'
    '\n'
    ;
    samples | evaluate python(typeof(*), code, kwargs)
};
OccupancyDetection 
| where Test == 1
| extend pred_Occupancy=bool(0)
| invoke classify_sf(ML_Models, 'AML-Occupancy', pack_array('Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'), 'pred_Occupancy')
| summarize n=count() by Occupancy, pred_Occupancy
'''

print("📋 Original scoring query recreated!")
print("   Uses: evaluate python() instead of python() plugin")
print("   Uses: Stored functions with invoke")
print("   Uses: binascii.unhexlify() for hex decoding")

print("\n🧪 Testing the original approach...")

scoring_results = None

try:
    %kql scoring_results << -query scoring_from_table_query
    
    if scoring_results is not None and not scoring_results.empty:
        print("✅ SUCCESS! Original approach works!")
        print(f"   Generated confusion matrix with {len(scoring_results)} rows")
        
        print("\n📊 Confusion Matrix Results:")
        display(scoring_results)
        
        # Calculate accuracy from confusion matrix
        if len(scoring_results) > 0:
            total_predictions = scoring_results['n'].sum()
            correct_predictions = scoring_results[scoring_results['Occupancy'] == scoring_results['pred_Occupancy']]['n'].sum()
            accuracy = correct_predictions / total_predictions if total_predictions > 0 else 0
            
            print(f"\n🎯 Model Performance:")
            print(f"   Total predictions: {total_predictions}")
            print(f"   Correct predictions: {correct_predictions}")
            print(f"   Accuracy: {accuracy:.3f}")
        
        print("\n🎉 COMPLETE SUCCESS!")
        print("   ✅ Model training: Complete")
        print("   ✅ Model storage in ADX: Complete")
        print("   ✅ Model scoring in ADX: Complete")
        print("   ✅ End-to-end pipeline: Working!")
        
    else:
        print("❌ Original approach returned no results")
        
except Exception as e:
    print(f"❌ Original approach failed: {e}")
    
    # Let's try a simpler version first
    print("\n🔧 Trying simplified version...")
    
    simple_scoring_query = r'''
let model_str = toscalar(ML_Models | where name == 'AML-Occupancy' | top 1 by timestamp desc | project model);
OccupancyDetection 
| take 3
| extend model_available = isnotnull(model_str)
| project Timestamp, Temperature, Occupancy, model_available
'''
    
    try:
        %kql simple_scoring << -query simple_scoring_query
        
        if 'simple_scoring' in locals() and not simple_scoring.empty:
            print("✅ Model retrieval works!")
            display(simple_scoring)
        else:
            print("❌ Model retrieval failed")
            
    except Exception as simple_e:
        print(f"❌ Simple test failed: {simple_e}")

print(f"\n{'='*60}")
print("KEY INSIGHT")
print("="*60)
print("The original approach uses:")
print("   • 'evaluate python()' - different from 'python()' plugin")
print("   • 'invoke' with stored functions")
print("   • 'binascii.unhexlify()' instead of 'bytes.fromhex()'")
print("\nThis may be supported even when python() plugin callouts aren't!")

Summary and Next Steps

What We've Accomplished

This tutorial demonstrated a complete end-to-end ML workflow using Azure Machine Learning and Azure Data Explorer:

Data Management: Stored and queried sensor data efficiently in ADX
Model Training: Trained multiple classification models using Azure ML compute
Model Evaluation: Compared performance across different algorithms
Model Deployment: Deployed models for real-time inference in ADX
Integration: Seamlessly connected ADX and Azure ML workflows

Key Takeaways

  • Azure Data Explorer provides powerful capabilities for both data storage and ML inference
  • Azure Machine Learning offers managed, scalable compute for model training
  • Python Plugin in ADX enables sophisticated data processing and ML operations
  • Modern SDK patterns like ScriptRunConfig provide flexibility and maintainability