Привет! Меня зовут Максим Морозов, я AI Project Manager в Битрикс24.В предыдущей статье я рассказывал о локальных нейросетях как безопасной и экономичной альтерПривет! Меня зовут Максим Морозов, я AI Project Manager в Битрикс24.В предыдущей статье я рассказывал о локальных нейросетях как безопасной и экономичной альтер

«Мы не дообучаем нейросеть, мы дообучаем скрипт»: Как Mac Mini и локальная LLM писали для нас Regex

2026/02/09 13:00
11м. чтение

Привет! Меня зовут Максим Морозов, я AI Project Manager в Битрикс24.

В предыдущей статье я рассказывал о локальных нейросетях как безопасной и экономичной альтернативе облачным API. Сегодня — практический кейс, где мы применили этот подход в реальном проекте.

Главная идея этой работы: вместо дообучения (Fine-Tuning) нейросети на своих данных — что долго, дорого и требует поддержки датасета — мы используем штатную модель без дополнительного обучения. Модель генерирует regex, а скрипт сохраняет эти правила и использует их автономно.

Я покажу архитектуру системы, где локальная LLM генерирует регулярные выражения для парсинга логов, экономя сотни часов ручной отладки. Все вычисления происходят внутри периметра компании, без отправки данных в облако.

Проблема: Парсинг разнообразных логов для Wazuh

У нас есть централизованный Syslog-сервер, куда стекаются данные со всех сервисов отдела:

  • Стандартные логи: Nginx, Apache, системные логи Linux;

  • Кастомный софт и микросервисы;

  • Самописные скрипты с собственными форматами вывода.

Вся эта информация агрегируется в Wazuh (Open Source SIEM) — система мониторинга безопасности, которая умеет реагировать на инциденты. Но для этого Wazuh должен понимать структуру лога: где IP-адрес, где уровень критичности (Critical/Warning), где информационные сообщения.

Нативные XML-декодеры Wazuh

Чтобы Wazuh понял нестандартный лог, нужно написать XML-декодер с регулярным выражением внутри. Процесс выглядит так:

  1. Анализируете формат нового лога;

  2. Пишете XML-декодер с Regex;

  3. Загружаете конфигурацию в Wazuh;

  4. Перезапускаете сервис.

Основная проблема — отладка. Если в Regex есть ошибка:

  • Wazuh не указывает строку с ошибкой;

  • Не объясняет характер проблемы.

Отладка превращается в итеративный процесс: копируете XML в сторонние валидаторы, проверяете Regex, видите что работает — вставляете в Wazuh, получаете ошибку. Когда у вас десятки сервисов с постоянно меняющимися форматами логов, поддержка SIEM становится значительной нагрузкой.

Пример простого XML декодера:

```xml <!-- Простой декодер с предварительным matching --> <decoder name="custom-app"> <prematch>^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}</prematch> </decoder> <!-- Дочерний декодер с извлечением полей --> <decoder name="custom-app-fields"> <parent>custom-app</parent> <regex>^(\d{4}-\d{2}-\d{2}) (\d{2}:\d{2}:\d{2}) (\w+) (.+)$</regex> <order>date, time, level, message</order> </decoder> ```

Пример сложного декодера:

2cafa38baf0dbb298afa906b17970091.png

Каждый новый сервис — это новый XML файл, свои regex, и цикл отладки. Решение - один штатный json декодер.

Смена архитектуры: JSON-проксирование

Wazuh имеет встроенный JSON-декодер, который работает намного надежнее XML-декодеров.

<decoder name="json-msgraph"> <prematch>"integration":"ms-graph"</prematch> <plugin_decoder>JSON_Decoder</plugin_decoder> <json_null_field>discard</json_null_field> </decoder> <decoder name="json"> <prematch>^{\s*"</prematch> <plugin_decoder>JSON_Decoder</plugin_decoder> </decoder>

Если подать в SIEM не сырую строку, а структурированный JSON:

```json { "timestamp": "2025-01-15", "level": "Error", "user": "admin", "action": "failed login", "src_ip": "192.168.1.5" }

...то Wazuh автоматически распарсит поля. Достаточно настроить правила вида: «Если поле level равно Error — создай алерт».

Новая архитектура

Мы вынесли логику парсинга за пределы Wazuh. Теперь пайплайн выглядит так:

  1. Syslog-сервер принимает сырые логи;

  2. Python-скрипт (пре-парсер) перехватывает их на лету;

  3. Скрипт применяет к строке регулярное выражение;

  4. Если строку удалось распарсить - отдаем в Wazuh.

Проблема с декодерами решена, но осталась одна задача: нам нужны регулярные выражения для Python-скрипта, и их нужно писать под каждую новую сигнатуру лога. Именно здесь мы подключили локальную LLM.

Железо и Стек: Зачем нам Mac Mini в отделе

Обычно, когда говорят про AI, представляют стойки с NVIDIA H100 или мощные GPU-серверы. Но для нашей задачи это избыточно. Мы использовали уже имеющийся Mac Mini на чипе M4 Pro и протестировали его на этой задаче.

Почему не облако?

  1. Логи могут содержать IP-адреса, пути, логины. Отправлять эти данные в облако — нарушение контура безопасности. Локальная модель гарантирует, что данные не покидают периметр;

  2. Облачные API стоят денег. Mac Mini потребляет 30–40 Вт — это копейки по сравнению с оплатой облачных API.

Софт и Модель

  • Среда исполнения: LM Studio или Ollama. Предоставляют API, совместимый с OpenAI;

  • Модель: мы используем собственную модель BitrixGPT, но подойдут и другие. Например GPT-OSS-20B — open-weight от OpenAI;

  • Производительность: ~70 токенов/сек на M4 Pro.

Таким образом, у нас появился бесплатный, приватный и быстрый сервис генерации.

Архитектура решения

Прежде чем переходить к алгоритму, рассмотрим архитектуру системы.

Компоненты

  • Drain3 Manager: Кластеризация лог-строк по шаблонам;

  • Regex Generator: LLM-генерация паттернов с валидацией и retry-механизмом;

  • Regex Parser: Применение проверенных паттернов к логам;

  • Stream Listener: Real-time обработка через TCP-сокет для syslog-ng;

  • Feedback Loop: Дообучение системы на нераспознанных строках;

  • Service Registry: Фильтрация и управление списком сервисов.

Потоки данных

Full Pipeline (для пакетной обработки файлов):

Log File → Ingest (Drain3) → Signatures→

LLM Generation → Regex → Parse → Output

Stream Pipeline (для real-time обработки):

syslog-ng → TCP Socket → Drain3 clustering →

Parse with existing regex → OK or FAIL buffer →

[threshold reached] → Feedback to Drain3 → Generate new regex → Re-parse FAIL

Структура данных

**Signatures JSON** (выход Drain3): ```json [ { "cluster_id": "c2", "size": 85, "signature": "<DATE> <TIME> <LEVEL> Login failed user=<*> ip=<*>", "examples": [ "2025-01-15 10:30:00 ERROR Login failed user=admin ip=192.168.1.5", "2025-01-15 10:31:00 ERROR Login failed user=guest ip=10.0.0.1" ] }, { "cluster_id": "c3", "size": 200, "signature": "<DATE> <TIME> <LEVEL> Request processed by <*> in <*> ms", "examples": [ "2025-01-15 10:30:00 INFO Request processed by worker-3 in 150 ms", "2025-01-15 10:31:00 INFO Request processed by worker-1 in 89 ms" ] } ]

Regex JSON (результат генерации):

[ { "cluster_id": "c2", "signature": "<DATE> <TIME> <LEVEL> Login failed user=<*> ip=<*>", "size": 85, "regex": "^(?P<date>\\d{4}-\\d{2}-\\d{2}) (?P<time>\\d{2}:\\d{2}:\\d{2}) (?P<level>\\w+) Login failed user=(?P<user>\\S+) ip=(?P<ip>[\\d.]+)$", "fields": ["date", "time", "level", "user", "ip"], "validation": { "valid": true, "matched": 85, "total": 85 } }, { "cluster_id": "c3", "signature": "<DATE> <TIME> <LEVEL> Request processed by <*> in <*> ms", "size": 200, "regex": "^(?P<date>\\d{4}-\\d{2}-\\d{2}) (?P<time>\\d{2}:\\d{2}:\\d{2}) (?P<level>\\w+) Request processed by (?P<worker>\\S+) in (?P<duration>\\d+) ms$", "fields": ["date", "time", "level", "worker", "duration"], "validation": { "valid": true, "matched": 200, "total": 200 } } ]

Output (финальный формат):

{"date": "2025-01-15", "time": "10:30:00", "level": "ERROR", "user": "admin", "ip": "192.168.1.5"} {"date": "2025-01-15", "time": "10:31:00", "level": "ERROR", "user": "guest", "ip": "10.0.0.1"} {"date": "2025-01-15", "time": "10:30:00", "level": "INFO", "worker": "worker-3", "duration": "150"} {"date": "2025-01-15", "time": "10:31:00", "level": "INFO", "worker": "worker-1", "duration": "89"}

Два режима работы

Система поддерживает два режима обработки логов:

1. Batch Mode (пакетная обработка):

Используется для обработки исторических данных или разового анализа логов.

Log File → Ingest (Drain3) → Signatures →

LLM Generation → Regex → Parse → Wazuh

2. Stream Mode (real-time обработка):

Используется для обработки логов в реальном времени.

syslog-ng → TCP Socket → Stream Listener → Drain3 clustering →

Parse with existing regex → OK or FAIL buffer →

[threshold] → Feedback → Generate new regex → Re-parse FAIL

Мы используем оба режима: Stream Mode для текущих логов, Batch Mode для ретроспективного анализа.

Алгоритм работы: «Инверсия обучения»

Ключевая идея проекта — вместо Fine-Tuning модели мы используем стандартную модель как универсальный инструмент генерации regex. Модель не "запоминает" логи, она создает правила, которые скрипт сохраняет и применяет самостоятельно.

Вот пошаговый разбор пайплайна:

Шаг 1: Кластеризация (Drain3)

Самая большая ошибка — пытаться скормить нейросети каждую строку лога. Это убьет производительность. Мы используем Python-библиотеку Drain3. Это алгоритм, который в реальном времени читает поток строк и выявляет шаблоны (сигнатуры).

Пример:

  • Строка 1: User admin login failed from 192.168.1.1

  • Строка 2: User guest login failed from 10.0.0.5

Drain3 определяет, что это один шаблон: User <*> login failed from <*> и присваивает ему уникальный ID кластера. Если скрипт видит лог с известным ID кластера, он применяет готовое правило без обращения к LLM.

Пример реализации:

def process_line(self, line: str) -> Optional[str]: """Обработка одной строки лога через Drain3.""" result = self._template_miner.add_log_message(line) cluster_id = result.get("cluster_id") if cluster_id: if cluster_id not in self._cluster_examples: self._cluster_examples[cluster_id] = [] if len(self._cluster_examples[cluster_id]) < 5: self._cluster_examples[cluster_id].append(line) return cluster_id

Шаг 2: Генерация (Промпт-инжиниринг)

Если Drain3 сигнализирует о новом шаблоне, скрипт собирает буфер из 5 реальных примеров и формирует промпт для локальной модели.

Логика retry: Максимум 3 попытки исправить невалидный regex. Каждая попытка включает feedback об ошибке. При успехе сохраняем regex.

Модель возвращает regex, которое мы сразу валидируем.

Пример regex, сгенерированного моделью:

5641aad9d166b2110c8675f8ba9cd793.png

40+ именованных групп. Такой regex вручную писать долго и чревато ошибками, а LLM генерирует его за секунды.

Метрики эффективности

На реальных данных наш подход показывает следующие результаты:

  • Точность генерации: 85-90% regex проходят валидацию с первой попытки;

  • Retry-эффективность: После 2-3 попыток успешность достигает 98%+;

  • Скорость обработки: Stream Mode обрабатывает ~100-500 строк/сек (зависит от сложности regex);

  • Инкрементальная обработка: Повторные запуски быстрее в 50-100 раз благодаря hash-based deduplication.

Шаг 3: Валидация с автоматическим исправлением

Мы не доверяем коду от нейросети без проверки. Скрипт получает ответ от модели и сразу валидирует его — пытается применить этот regex к тем же 5 примерам. Если regex не работает, система возвращает примеры, regex и ошибку обратно в модель с просьбой исправить

Пример функции валидации:

def _validate_regex(regex: str, examples: List[str]) -> Dict[str, Any]: """Компилируем regex и проверяем на примерах.""" try: compiled = re.compile(regex) except re.error as exc: return { "valid": False, "matched": 0, "total": len(examples), "error": f"Compile error: {exc}" } matched = sum(1 for ex in examples if compiled.match(ex.strip())) return { "valid": matched == len(examples), "matched": matched, "total": len(examples), "error": None if matched == len(examples) else f"Matched only {matched}/{len(examples)}" }

Логика обработки результата:

  • Regex работает и извлекает данные → сохраняем в базу и привязываем к ID кластера Drain3

  • Ошибка или неполное совпадение → скрипт просит модель перегенерировать regex (обычно со второй попытки получается идеально)

Шаг 4: Продакшн (Кэширование)

Теперь, когда для этого шаблона есть проверенный Regex, нейросеть больше не нужна.

При появлении такого лога скрипт достает правило из базы, парсит строку и отправляет в Wazuh.

Итог: Нейросеть работает только в момент появления нового сервиса или изменения формата логов. 99.9% времени система работает на чистом CPU, потребляя минимум ресурсов.

Технические грабли и нюансы

В теории всё звучит красиво, но на практике мы собрали достаточно граблей. Вот что стоит учесть, если захотите повторить наш опыт.

1. Зарезервированные поля Wazuh

На практике мы обнаружили, что некоторые имена полей могут вызывать конфликты с внутренней логикой Wazuh. Например в правиле нельзя использовать имя "action" для полей. Вместо "action" используй "log_action" и т.д. https://documentation.wazuh.com/current/user-manual/ruleset/ruleset-xml-syntax/rules.html

2. Regex-галлюцинации

Иногда модель увлекается и выдает избыточно сложные Regex — например, использует сложные Lookbehind-проверки или атомарные группы, которые медленные на больших объемах.

А иногда наоборот создается слишком “жадные” и пытается положить 90% лога в один (?P<message>.+) или (?P<info>.+)

Хорошо, что все эти сложности решаются обычной инструкцией:

1. Используй ТОЛЬКО именованные группы вида (?P<name>pattern)

2. ЗАПРЕЩЕНО:

  • lookahead (?=...)

  • lookbehind (?<=...)

2.1 создавать группу (?P<message>.+) или (?P<info>.+), если "хвост" лога содержит структуру

Структурой считаются:

  • URL-адреса (ws://..., http://...)

  • IP-адреса

  • Числа

  • Устойчивые фразы (Connecting to, Failed at, User logged in)

2.2 Запрещено использовать "action" для названия полей. Вместо "action" используй "log_action" и т.д.

Оптимизации

Инкрементальная обработка

Пересчитывать все кластеры при каждом запуске дорого. Добавили hash-based идентификацию обработанных кластеров.

def _compute_cluster_hash(signature: str, examples: List[str]) -> str: """Вычисляем стабильный хеш кластера.""" if not examples: return hashlib.sha256(signature.encode()).hexdigest()[:16] content = f"{signature}|{examples[0]}" return hashlib.sha256(content.encode()).hexdigest()[:16] # В основном цикле: if _compute_cluster_hash(cluster.signature, cluster.examples) in processed_hashes: print(f"Skipping already processed: {cluster.cluster_id}") continue

Feedback Loop с автоматическим триггером

Новые типы логов появляются в произвольные моменты. Реализовали буферизацию FAIL-строк с триггером по threshold ИЛИ интервалу.

def _check_and_trigger_feedback(self, service: str) -> None: """Проверка условий для запуска feedback loop.""" if service not in self.feedback_buffers: return buffer_size = len(self.feedback_buffers[service]) now = datetime.now() last_trigger = self.last_trigger_times.get(service, now) # Триггер: накопилось 100 FAIL строк ИЛИ прошло 5 минут threshold_reached = buffer_size >= 100 interval_reached = (now - last_trigger).total_seconds() >= 300 if threshold_reached or (interval_reached and buffer_size > 0): # Feed to Drain3 → Generate new regex → Re-parse FAIL self._trigger_feedback_loop(service) self.last_trigger_times[service] = now

Graceful Shutdown и persistence

При падении сервиса можно потерять текущее состояние. Добавили периодическое сохранение + graceful shutdown.

def gracefulshutdown(self): """Сохранение состояния при остановке.""" for service, manager in self.drain_managers.items(): manager.save_state() # Сохраняем Drain3 состояние manager.dump_signatures() # Сохраняем сигнатуры

Гибридная схема (Local + Cloud)

Локальная модель на Mac Mini идеально справляется с тактической задачей — написанием конкретных регулярных выражений. Но у Wazuh есть и стратегический уровень — иерархия правил (Rules).

Это когда одно правило зависит от другого: «Если сработал парсер Nginx (Parent), И уровень ошибки Critical, И IP из черного списка (Children) — тогда бей тревогу».

Строить такую красивую древовидную структуру (XML) локальной модели сложновато — ей не хватает контекста и «интеллекта» для глобальной логики.

Поэтому мы используем гибридный подход:

  1. Локально (Mac Mini): Парсим «сырые» логи, генерируем базовые Regex-кирпичики и создаем xml для правило для каждой сигнатуры. Здесь остаются все персональные данные (IP, логины).

  2. Анонимизация: Скрипт берет пачку готовых, проверенных regex, xml правила для сервиса, убирает примеры логов, оставляя только структуру полей (src_ip, user_name),

  3. Облако: Обезличенную структуру мы отправляем в большую модель

Запрос: «Вот 30 regex и 30 xml. Построй из них оптимальную иерархию XML-правил для Wazuh с наследованием».

Результат: Облачная модель возвращает структурированный XML, который мы импортируем в Wazuh .

<group name="optimized-rules"> <!-- ================================================== --> <!-- 🚦 SECTION 1: TRAFFIC JOURNAL (База ID: 26000) --> <!-- ================================================== --> <!-- РОДИТЕЛЬСКОЕ ПРАВИЛО ДЛЯ TRAFFIC-JOURNAL --> <rule id="26000" level="0"> <decoded_as>json</decoded_as> <field type="pcre2" name="log_type">^traffic-journal$</field> <description>Base rule for Traffic Journal logs</description> </rule> <!-- Дети: Actions (Drop/Accept) --> <rule id="26008" level="5"> <if_sid>26000</if_sid> <field type="pcre2" name="result">^drop$</field> <description>Traffic journal: Result DROP</description> <group>traffic-journal, drop</group> </rule>

и т.д.

Так мы автоматизируем задачу администратора SIEM на 80-90%, оставляя за человеком только финальное «ОК». И при этом никакие чувствительные данные не покидают периметр — в облако уходит только сухая логика правил.

Заключение

Что получилось:

  • Автоматическая генерация Regex и XML без участия человека;

  • Real-time обработка с feedback loop для “дообучения” скрипта на новых типах логов;

  • Приватность данных — вся обработка чувствительных данных внутри периметра компании.

Если у вас есть возможность, то используйте LLM как архитектора правил, а не как исполнителя.

Источник

Отказ от ответственности: Статьи, размещенные на этом веб-сайте, взяты из общедоступных источников и предоставляются исключительно в информационных целях. Они не обязательно отражают точку зрения MEXC. Все права принадлежат первоисточникам. Если вы считаете, что какой-либо контент нарушает права третьих лиц, пожалуйста, обратитесь по адресу [email protected] для его удаления. MEXC не дает никаких гарантий в отношении точности, полноты или своевременности контента и не несет ответственности за любые действия, предпринятые на основе предоставленной информации. Контент не является финансовой, юридической или иной профессиональной консультацией и не должен рассматриваться как рекомендация или одобрение со стороны MEXC.