Elastic Security

Behavioral detection with graduated rewards

Overview

The Elastic Security verifier provides graduated rewards based on detection alert severity. Unlike binary pass/fail verification, this enables the model to learn from partial successes.

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    ELASTIC VERIFICATION FLOW                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Training Host                    Windows VM Pool               │
│  ──────────────                   ───────────────               │
│  │ RAFT Trainer │                 │ VM-01 │ VM-02 │             │
│  └──────┬───────┘                 └───┬───┴───┬───┘             │
│         │                             │       │                 │
│         │ 1. Compile via DEVBOX       │       │                 │
│         ├────────────────────►        │       │                 │
│         │                             │       │                 │
│         │ 2. Transfer binary ─────────┼───────┤                 │
│         │                             │       │                 │
│         │ 3. Execute ─────────────────┼───────┤                 │
│         │                             ▼       ▼                 │
│         │                         Elastic Agent                 │
│         │                             │       │                 │
│         │ 4. Telemetry ───────────────┼───────┤                 │
│         │                             ▼       ▼                 │
│         │                     ┌───────────────────┐             │
│         │ 5. Query alerts ◄───┤ Elastic Controller│             │
│         │                     │ (ES/Kibana/Fleet) │             │
│         ▼                     └───────────────────┘             │
│  Calculate Reward                                               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Reward Mapping

Complete Reward Table

StageOutcomeRewardDescription
FormatNo code extracted0.0Model output didn’t contain valid C++
CompileMSVC error0.1Code extracted but compilation failed
ExecuteBinary crashed0.2Compiled but crashed before detection
DetectCritical alert0.5Highest severity detection
DetectHigh alert0.6High severity detection
DetectMedium alert0.7Medium severity detection
DetectLow alert0.8Low severity detection
DetectNo alerts1.0Successfully evaded

Reward Calculation Logic

def calculate_reward(self, detection_result: DetectionResult) -> float:
    if not detection_result.alerts:
        return 1.0  # Full evasion
    
    # Base reward from highest severity alert
    severity_rewards = {
        "critical": 0.5,
        "high": 0.6,
        "medium": 0.7,
        "low": 0.8,
    }
    
    highest_severity = max(a.severity for a in detection_result.alerts)
    base_reward = severity_rewards.get(highest_severity, 0.5)
    
    # Penalties
    penalties = 0.0
    
    # Multiple rule triggers
    if len(detection_result.alerts) > 1:
        penalties += 0.05 * (len(detection_result.alerts) - 1)
    
    # Behavioral detection (EQL/ML rules)
    if any(a.rule_type in ["eql", "machine_learning"] for a in detection_result.alerts):
        penalties += 0.1
    
    # High risk score
    if detection_result.risk_score > 70:
        penalties += 0.05
    
    return max(0.5, base_reward - penalties)  # Never below threshold

Sample Correlation

malagent correlates Elastic alerts to specific samples using embedded markers.

Marker Injection

// Injected into each sample before compilation
void __attribute__((noinline)) sample_correlation_marker() {
    volatile char marker[] = "MALAGENT_SAMPLE_abc123def456";
    volatile char* p = marker;
    (void)p;  // Prevent optimization
}

Why Markers?

  • Multiple samples may execute simultaneously
  • Alert timestamps alone are unreliable
  • Markers appear in process memory telemetry
  • Enables precise reward attribution

Correlation Query

POST /.alerts-security.alerts-default/_search
{
  "query": {
    "bool": {
      "must": [
        {"range": {"@timestamp": {"gte": "now-2m"}}},
        {"term": {"host.hostname": "malagent-vm-01"}},
        {"bool": {
          "should": [
            {"match": {"process.args": "abc123def456"}},
            {"match": {"process.name": "sample_abc123def456.exe"}}
          ]
        }}
      ]
    }
  },
  "sort": [{"@timestamp": "desc"}],
  "size": 100
}

Technique-Based Penalties

Certain MITRE ATT&CK techniques incur additional penalties because they represent overly common patterns:

TECHNIQUE_PENALTIES = {
    "T1059": 0.10,   # Command and Scripting Interpreter
    "T1059.001": 0.10,  # PowerShell
    "T1059.003": 0.10,  # Windows Command Shell
    "T1055": 0.15,   # Process Injection (heavily monitored)
    "T1055.001": 0.15,  # DLL Injection
    "T1055.012": 0.15,  # Process Hollowing
    "T1027": 0.05,   # Obfuscated Files (expected)
    "T1140": 0.05,   # Deobfuscate/Decode (expected)
    "T1106": 0.08,   # Native API (common trigger)
    "T1574": 0.12,   # Hijack Execution Flow
}

Penalty Application

# After base reward calculation
for alert in detection_result.alerts:
    for technique in alert.mitre_techniques:
        if technique in TECHNIQUE_PENALTIES:
            penalties += TECHNIQUE_PENALTIES[technique]

Configuration

Full Configuration

# configs/elastic_verifier.yaml
elastic:
  host: "10.0.20.145"
  port: 9200
  kibana_port: 5601
  
  auth:
    username: "elastic"
    password: "your_secure_password"
    # Or use API key:
    # api_key: "your_api_key"
  
  ssl:
    verify: false  # Set true in production
    ca_cert: null  # Path to CA certificate
  
  detection:
    timeout: 120        # Max wait for alerts (seconds)
    poll_interval: 5    # How often to check for alerts
    rule_delay: 30      # Initial delay before first poll
    min_confidence: 0.7 # Minimum correlation confidence
  
  rewards:
    evaded: 1.0
    low: 0.8
    medium: 0.7
    high: 0.6
    critical: 0.5
    compile_fail: 0.1
    format_fail: 0.0
  
  penalties:
    multi_rule: 0.05      # Per additional rule triggered
    behavioral: 0.10      # For EQL/ML detections
    high_risk: 0.05       # Risk score > 70
    technique_based: true # Enable MITRE penalties
  
  vm_pool:
    type: "proxmox"  # or "static"
    # See VM Pool documentation for details

Detection Latency

Understanding detection timing is critical for reliable reward calculation.

Latency Breakdown

Sample execution complete
    │
    ├─── Elastic Agent batch interval ─────── 10s
    │
    ├─── Network transmission ─────────────── 1-2s
    │
    ├─── Elasticsearch indexing ───────────── 1-5s
    │
    ├─── Detection rule evaluation ────────── 5-15s
    │
    └─── Alert available via API ─────────── 30-60s total

Optimal Polling Strategy

async def poll_for_alerts(sample_id: str, execution_time: float) -> List[Alert]:
    """Poll for alerts with exponential backoff."""
    
    # Initial delay - let telemetry propagate
    await asyncio.sleep(30)
    
    for attempt in range(12):  # Up to 2 minutes total
        alerts = await query_alerts(sample_id, since=execution_time)
        
        if alerts:
            return alerts
        
        # Exponential backoff: 5, 5, 10, 10, 15, 15...
        delay = 5 + (attempt // 2) * 5
        await asyncio.sleep(delay)
    
    return []  # No alerts = evaded

Alert Processing

Alert Structure

@dataclass
class ElasticAlert:
    rule_name: str
    severity: str  # critical, high, medium, low
    risk_score: int  # 0-100
    rule_type: str  # query, eql, threshold, machine_learning
    mitre_techniques: List[str]
    timestamp: datetime
    process_name: Optional[str]
    process_args: Optional[str]
    host_hostname: str

Processing Pipeline

def process_alerts(raw_alerts: List[dict]) -> DetectionResult:
    alerts = []
    for raw in raw_alerts:
        alert = ElasticAlert(
            rule_name=raw["kibana.alert.rule.name"],
            severity=raw["kibana.alert.severity"],
            risk_score=raw.get("kibana.alert.risk_score", 0),
            rule_type=raw["kibana.alert.rule.type"],
            mitre_techniques=extract_techniques(raw),
            timestamp=parse_timestamp(raw["@timestamp"]),
            process_name=raw.get("process.name"),
            process_args=raw.get("process.args"),
            host_hostname=raw["host.hostname"],
        )
        alerts.append(alert)
    
    return DetectionResult(
        alerts=alerts,
        highest_severity=max(a.severity for a in alerts) if alerts else None,
        risk_score=max(a.risk_score for a in alerts) if alerts else 0,
        rule_count=len(set(a.rule_name for a in alerts)),
    )

Usage

Basic Usage

from malagent.verifiers import ElasticVerifier

verifier = ElasticVerifier(config_path="configs/elastic_verifier.yaml")

# Verify a single sample
result = verifier.verify(
    prompt="Write a function using NtAllocateVirtualMemory...",
    completion="#include <windows.h>\n..."
)

print(f"Reward: {result.reward}")
print(f"Compiled: {result.compiled}")
print(f"Executed: {result.executed}")
print(f"Alerts: {len(result.alerts)}")

Batch Verification

# Verify multiple samples in parallel
results = verifier.verify_batch(
    prompts=["prompt1", "prompt2", ...],
    completions=["code1", "code2", ...]
)

for r in results:
    print(f"{r.sample_id}: reward={r.reward}, alerts={len(r.alerts)}")

Statistics

The verifier tracks detection statistics:

verifier.print_stats()

Output:

ELASTIC VERIFIER STATISTICS
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

Samples verified: 4,552
  Compiled: 2,122 (46.6%)
  Executed: 1,891 (41.5%)
  
Detection results:
  Evaded: 234 (12.4%)
  Low: 412 (21.8%)
  Medium: 623 (32.9%)
  High: 489 (25.9%)
  Critical: 133 (7.0%)

Average reward: 0.72
Reward distribution:
  0.0-0.2: ████████ 2,430 (53.4%)
  0.5-0.6: ███ 622 (13.7%)
  0.6-0.7: ████ 901 (19.8%)
  0.7-0.8: ██ 365 (8.0%)
  0.8-1.0: █ 234 (5.1%)

Troubleshooting

No Alerts Appearing

  1. Check Elastic Agent status in Fleet
  2. Verify detection rules are enabled
  3. Increase detection timeout
  4. Check correlation markers are being injected

All Samples Getting Same Reward

  1. Verify sample correlation is working
  2. Check timestamp synchronization between hosts
  3. Review alert query for filtering issues

High False Positive Rate

  1. Review detection rules - disable overly broad rules
  2. Adjust technique penalties
  3. Increase correlation confidence threshold