Microsoft 365

Microsoft 365 로그 관리 (2): Microsoft Defender for Identity (MDI) 로그 관리와 Power BI 연결

Pepuri 2025. 8. 24. 12:33
반응형

지난 포스팅:

2025.08.10 - [Microsoft 365] - Microsoft 365 로그 관리 (1) Sentinel로 시작하기

 

이번에는 MDI Log를 Sentinel에 연결한 뒤, Power BI에서 편집할 수 있게 연결하는 과정을 다루겠습니다.

Step 1. MDI 활성화 체크

경로: System → Settings → Identities

 

센서 활성화 확인: 최근 MDI v3는 DC가 MDE 온보딩만 되어 있어도 활성화가 가능해져 등록이 한결 간편해졌습니다.

(새 버전이 정식 릴리스되면 별도 글로 정리하겠습니다.)

 

 

Signal 확인: Advanced Hunting에서 AD Logon Event가 기록되는지 확인합니다.

→ 여기서 Signal이 보이면 Sentinel에서도 로그가 수집되는 것을 확인할 수 있습니다.

 

 

커넥터 설정: Microsoft Defender XDR → Open connector page

Microsoft Defender for Identity 옵션 체크 후 저장

 

 

일정 시간 경과 후 Sentinel에서 MDI 로그 조회 가능

 

Step 2. Sentinel 로그 Export를 위한 Enterprise App 등록

Advanced HuntingSentinel은 (현재 기준) 대량 쿼리에 제한 사항이 있습니다.

최종 목표는 Power BI 시각화로 인사이트를 얻는 것이므로, 우선 SharePoint에 CSV로 적재하는 흐름을 구성합니다.

이를 위해 Log Analytics API 호출 방식으로 접근하며, Enterprise App 등록이 필요합니다.

 

등록 절차

1. Entra 관리센터 → App registrations New registration

 

2. 이름 지정 → Register

 

3. API permissions → Add a permission

 

4. APIs my organization uses → Log Analytics API

 

5. Data.Read 체크 → Add permissions

 

6. Grant admin consent (테넌트에 동의 부여)

 

7. Certificates & secrets → New client secret → Description 입력 → Add

 

8. API 호출용 암호(Secret) 생성 개념

→ 생성된 Value는 별도의 파일에 복사

 

9. Log Analytics Workspaces → Access control (IAM) → Add role assignment

 

10. Log Analytics Reader 선택 → Next

 

11. 방금 만든 App에 권한 부여

 

Step 3. CSV로 내보내기

각각의 App에 사용할 정보

 

Tenant ID & Client ID

 

Workspace ID

 

Client Secret

 

AI를 통해서 Log Analytics API를 호출하는 스크립트를 아래와 같이 만들었습니다.
# === Authentication (Service Principal) ===
$TenantId = "<TENANT_ID>"
$ClientId = "<CLIENT_ID>"
$Secret   = "<CLIENT_SECRET>"  

# === Workspace ===
$WorkspaceId = "<WORKSPACE_ID>"

# === Extraction target / Period / Output ===
$Table        = "IdentityLogonEvents"
$StartUtc     = [datetime]"2025-08-12T00:00:00Z"
$EndUtc       = [datetime]::UtcNow
$ChunkHours   = 6  #6시간 단위로 추출
$OutDir       = "F:\sentinel\IdentityLogonEvents"
$FilePrefix   = "IdentityLogonEvents"
$SkipExisting = $true

# === Interval / Retry / Timeout ===
$MinIntervalSeconds = 30 #한번 호출한 뒤, 인터벌
$HttpTimeoutSeconds = 300
$MaxRetries         = 5
$BaseDelaySeconds   = 5

<# ======================= Utilities ======================= #>

# Create folder
New-Item -ItemType Directory -Force -Path $OutDir | Out-Null

# Token cache
$Script:TokenInfo = $null

function Get-LogAnalyticsToken {
    if ($Script:TokenInfo -and $Script:TokenInfo.ExpiresOn -gt (Get-Date).ToUniversalTime().AddMinutes(5)) {
        return $Script:TokenInfo.AccessToken
    }

    $body = @{
        client_id     = $ClientId
        client_secret = $Secret
        grant_type    = "client_credentials"
        scope         = "https://api.loganalytics.io/.default"
    }

    $tokenResponse = Invoke-RestMethod -Method Post `
        -Uri "https://login.microsoftonline.com/$TenantId/oauth2/v2.0/token" `
        -Body $body `
        -TimeoutSec $HttpTimeoutSeconds

    $Script:TokenInfo = [pscustomobject]@{
        AccessToken = $tokenResponse.access_token
        ExpiresOn   = (Get-Date).ToUniversalTime().AddSeconds([int]$tokenResponse.expires_in)
    }
    return $Script:TokenInfo.AccessToken
}

function Invoke-LAQuery {
    param(
        [Parameter(Mandatory=$true)] [string] $Kql,
        [Parameter(Mandatory=$true)] [string] $WorkspaceId
    )

    $attempt = 0
    while ($true) {
        $attempt++
        $token = Get-LogAnalyticsToken
        $headers = @{ Authorization = "Bearer $token" }
        $body    = @{ query = $Kql } | ConvertTo-Json

        try {
            return Invoke-RestMethod -Method Post `
                -Uri "https://api.loganalytics.azure.com/v1/workspaces/$WorkspaceId/query" `
                -Headers $headers -ContentType "application/json" `
                -Body $body -TimeoutSec $HttpTimeoutSeconds
        }
        catch {
            $status = $_.Exception.Response.StatusCode.value__
            $resp   = $null
            try { $resp = [System.IO.StreamReader]::new($_.Exception.Response.GetResponseStream()).ReadToEnd() } catch {}

            # 401: Refresh token
            if ($status -eq 401 -and $attempt -le $MaxRetries) {
                $Script:TokenInfo = $null
                Start-Sleep -Seconds ($BaseDelaySeconds * [math]::Pow(2, $attempt - 1))
                continue
            }

            # 429 or 5xx
            if (($status -eq 429 -or $status -ge 500) -and $attempt -le $MaxRetries) {
                $retryAfter = 0
                try { $retryAfter = [int]$_.Exception.Response.Headers["Retry-After"] } catch {}
                if ($retryAfter -le 0) {
                    $retryAfter = [int]($BaseDelaySeconds * [math]::Pow(2, $attempt - 1))
                }
                Write-Warning "Query throttled/failed (status $status). Retry in $retryAfter sec. Attempt $attempt/$MaxRetries"
                Start-Sleep -Seconds $retryAfter
                continue
            }

            throw "Log Analytics query failed (status $status): $resp"
        }
    }
}

function Convert-RowsToObjects {
    param(
        [Parameter(Mandatory=$true)] $ResultTable
    )
    $cols = $ResultTable.columns.name
    $rows = $ResultTable.rows | ForEach-Object {
        $o = [ordered]@{}
        for ($i=0; $i -lt $cols.Count; $i++) { $o[$cols[$i]] = $_[$i] }
        [pscustomobject]$o
    }

    foreach ($row in $rows) {
        foreach ($p in $row.PSObject.Properties) {
            $v = $p.Value
            if ($v -is [System.Collections.IDictionary] -or
                $v -is [System.Array] -or
                $v -is [PSCustomObject]) {
                $row.($p.Name) = ($v | ConvertTo-Json -Compress -Depth 50)
            }
        }
    }
    return $rows
}

function Wait-ForRateLimit($startedAt, [int]$minSeconds) {
    $elapsed = [int]((Get-Date).ToUniversalTime() - $startedAt).TotalSeconds
    $remain  = $minSeconds - $elapsed
    if ($remain -gt 0) { Start-Sleep -Seconds $remain }
}

<# ======================= Query Loop ======================= #>

$cursor = $StartUtc
while ($cursor -lt $EndUtc) {
    $iterStart = [datetime]::UtcNow

    $chunkStart = $cursor
    $chunkEnd   = $cursor.AddHours($ChunkHours)
    $cursor     = $chunkEnd

    $stamp   = $chunkStart.ToString("yyyyMMddHHmm")
    $outFile = Join-Path $OutDir ("{0}{1}.csv" -f $FilePrefix, $stamp)
    if ($SkipExisting -and (Test-Path $outFile)) {
        Write-Host "Skip: $outFile"
        Wait-ForRateLimit $iterStart $MinIntervalSeconds
        continue
    }

    $startIso = $chunkStart.ToString("yyyy-MM-ddTHH:mm:ssZ")
    $endIso   = $chunkEnd.ToString("yyyy-MM-ddTHH:mm:ssZ")

    $kql = @"
$Table
| where TimeGenerated >= datetime('$startIso')
| where TimeGenerated <  datetime('$endIso')
| order by TimeGenerated asc
"@

    Write-Host ("Query {0}Z ~ {1}Z" -f $chunkStart.ToString("s"), $chunkEnd.ToString("s"))

    try {
        $r = Invoke-LAQuery -Kql $kql -WorkspaceId $WorkspaceId

        if (-not $r.tables -or $r.tables.Count -eq 0 -or -not $r.tables[0]) {
            Write-Host "  -> No result table."
        } else {
            $rows = Convert-RowsToObjects -ResultTable $r.tables[0]
            if ($rows -and $rows.Count -gt 0) {
                $rows | Export-Csv -Path $outFile -NoTypeInformation -Encoding UTF8
                Write-Host ("  -> {0} rows -> {1}" -f $rows.Count, $outFile)

                if ($rows.Count -ge 450000) {
                    Add-Content -Path (Join-Path $OutDir "_oversized.txt") -Value "$startIso~$endIso,$($rows.Count)"
                    Write-Warning "Result very large ($($rows.Count) rows). Consider reducing chunk size for this period."
                }
            } else {
                Write-Host "  -> No rows."
            }
        }
    }
    catch {
        Write-Warning "Error range: $startIso ~ $endIso"
        Write-Warning "Error: $($_.Exception.Message)"
        Add-Content -Path (Join-Path $OutDir "_failed.txt") -Value "$startIso~$endIso"
    }

    Wait-ForRateLimit $iterStart $MinIntervalSeconds
}

Write-Host "Done. Output dir: $OutDir"

 

앞에서도 언급했듯이 API는 호출에 대한 제한사항이 있기 때문에 ChunkHours MinIntervalSeconds 을 적절하게 조절할 수 있어야 합니다.
정보 값이 정상적으로 입력되었다면, 아래와 같이 내보내기 과정이 진행됩니다.

 

Step 4. Power BI 연결 (SharePoint의 CSV 집합 로딩)

현재 Sentinel Data Lake 통합(프리뷰)은 있지만, Power BI 직접 연결은 시점에 따라 제한될 수 있습니다.

아래와 같이 Power BI를 연결할 수 있는 쿼리를 제공하지만, API 호출제한  때문에 BI에서 활용할 수 있는 별도의 적재 공간이 필요합니다.


우선 비용 부담이 적은 SharePoint Online에 적재 후 Power BI에서 집계합니다.

 

Power BI 연결 흐름

SharePoint에 CSV 업로드 (자동화 스크립트 또는 수동)

 

Power BI Desktop → 데이터 가져오기 → 빈 쿼리 → 고급 편집기

 

Power Query에서 SharePoint.Files() 함수를 사용해 폴더 내 CSV 병합

let
    // ========== ① User Settings ==========
    SiteUrl         = "https://clim823.sharepoint.com/sites/Sentinel",
    LibraryName     = "Shared Documents",      
    TargetFolder    = "IdentityLogonEvents",     
    FileNamePrefix  = "IdentityLogonEvents",     
    KeepLastNMonths = 6,

    // ========== ② File → Table Conversion Function ==========
    ParseCsv = (fileContent as binary) as table =>
        let
            csv = Csv.Document(
                    fileContent,
                    [Delimiter = ",", Columns = null, Encoding = 65001, QuoteStyle = QuoteStyle.Csv]
                  ),
            promoted = Table.PromoteHeaders(csv, [PromoteAllScalars = true])
        in
            promoted,

    // ========== ③ Navigate to Target Folder ==========
    Source      = SharePoint.Contents(SiteUrl, [ApiVersion = 15]),
    Library     = Source{[Name=LibraryName]}[Content],
    Folder      = Library{[Name=TargetFolder]}[Content],   // DeviceLogonEvents 

    // ========== ④ Filter Files ==========
    FilteredByName = Table.SelectRows(Folder, each Text.StartsWith([Name], FileNamePrefix)),
    FilteredByExt  = Table.SelectRows(FilteredByName, each Text.Lower([Extension]) = ".csv"),

    // ========== ⑤ Load Files → Convert to Tables ==========
    AddedData   = Table.AddColumn(FilteredByExt, "Data", each ParseCsv([Content]), type table),
    TablesList  = List.RemoveNulls(List.Transform(AddedData[Data], each try _ otherwise null)),

    // ========== ⑥ Align Schema & Merge ==========
    AllCols        = if List.Count(TablesList) = 0 
                     then {} 
                     else List.Distinct(List.Combine(List.Transform(TablesList, each Table.ColumnNames(_)))),
    AlignedTables  = List.Transform(TablesList, each Table.ReorderColumns(_, AllCols, MissingField.UseNull)),
    Appended       = if List.Count(AlignedTables) = 0 
                     then #table(AllCols, {}) 
                     else Table.Combine(AlignedTables),

    // ========== ⑦ Filter by Last N Months ==========
    WithTimestampTyped = if List.Contains(Table.ColumnNames(Appended), "Timestamp")
                         then Table.TransformColumnTypes(Appended, {{"Timestamp", type datetime}})
                         else Appended,

    FilteredByDate =
        if List.Contains(Table.ColumnNames(WithTimestampTyped), "Timestamp")
        then Table.SelectRows(WithTimestampTyped, each [Timestamp] >= Date.AddMonths(DateTime.LocalNow(), -KeepLastNMonths))
        else WithTimestampTyped
in
    FilteredByDate

 

예시 쿼리

아래와 같이 테이블을 가져옵니다. Excel에서 파워쿼리로 편집을 진행하는 것처럼 필요에 따라서 일부 열은 Json형태로 변경하여 활용합니다. -> 닫기 및 적용

  

 

 

 

닫기 및 적용 → Power BI 편집 진행

 

Power BI를 활용하면, 보안로그를 기반으로 한 대시보드를 제작 할 수 있습니다.
아래와 같이 시각적으로 어느 디바이스가 어떠한 루트로 공격 받는지 시각적으로 확인할 수 있습니다.

 

Power BI 관련 내용은 이미 Youtube에 많은 영상이 있기 때문에 제작 과정을 블로그에서 따로 다루지는 않겠습니다.
여기서 다른 Log들과 추가 연계한다면 더 좋은 인사이트를 얻을 수 있을 거라 생각합니다.

 

반응형