Привет, Хабр!
Менеджеры продаж не всегда вовремя и полно заносят данные о сделках в CRM после звонка. Часть информации может забыться, а часть может быть записана сокращенно.
Прослушивать звонки вручную и восстанавливать детали слишком трудоёмко, а ресурсов на это часто не хватает. Поэтому в этом материале соберём MVP-сервис на Python, который получает событие о завершённом звонке из МТС Exolve, забирает текст разговора, выделяет из него ключевые поля через YandexGPT и записывает результат в Bitrix24. На выходе получится рабочий пайплайн: вебхук, транскрибация, извлечение полей квалификации и обновление существующей сделки в CRM.
За основу возьмём BANT — базовый фреймворк квалификации лида: Budget, Authority, Need, Timing, то есть бюджет, лицо, принимающее решение, потребность и сроки. И расширим его, добавив оценку интереса клиента, фиксацию конкурентов и возражений. Такого набора достаточно, чтобы квалифицировать лид, приоритизировать сделку и сохранить контекст следующего контакта, но не превращать карточку в анкету на десятки полей.
Стек: Python 3.10+, Flask, SQLite, Call Transcribation API МТС Exolve, YandexGPT API, Bitrix24 REST API.
Разделим MVP на пять компонентов. Такой расклад даёт понятный поток данных и не смешивает вебхук, работу с внешними API и запись в CRM в одном файле.
app.py принимает вебхук о завершении звонка, валидирует запрос, запускает фоновую обработку и отдаёт быстрый ответ.
database.py хранит состояние пайплайна и защищает систему от дублей по call_id.
exolve_api.py забирает транскрибацию из МТС Exolve и приводит её к формату диалога.
yandex_llm.py формирует строгий запрос к модели и парсит ответ в JSON.
bitrix24_crm.py ищет сделку и обновляет пользовательские поля Bitrix24.
Границы ответственности простые: HTTP-слой только принимает событие, база хранит состояние обработки, а интеграции с МТС Exolve, YandexGPT и Bitrix24 изолированы по модулям.
Самая чувствительная часть такого сценария — не задерживать ответ на входящее событие. Поэтому мы быстро валидируем payload и сразу уводим тяжёлую работу в фон.
app.py
@app.route('/webhook/exolve', methods=['POST']) def handle_exolve_webhook(): if request.args.get('token') != app.config['WEBHOOK_SECRET']: return "Forbidden", 403 data = request.json or {} if data.get('event_type') != 'call.completed': return jsonify({"status": "ignored"}), 200 payload = data.get('payload', {}) call_id = payload.get('call_id') direction = payload.get('direction') client_phone = payload.get('from') if direction == 'inbound' else payload.get('to') audio_url = payload.get('recording_url') if not call_id or not client_phone or not audio_url: return "Bad Request", 400 if create_call_record(call_id, client_phone, audio_url): thread = threading.Thread( target=process_call_async, args=(call_id, client_phone, audio_url), ) thread.start() return jsonify({"status": "accepted"}), 202 return jsonify({"status": "already_processed"}), 200
На вход обработчик получает JSON от МТС Exolve, на выходе — короткий HTTP-ответ и фоновую задачу. Критичных мест три: проверить секрет до любой тяжёлой логики, не запускать пайплайн на промежуточных событиях и не создавать второй поток для обработанного call_id.
Важно вернуть именно 202 Accepted, а не ждать полного завершения пайплайна. HTTP-слой подтверждает, что мы приняли событие и взяли его в обработку, а не то, что успели получить транскрибацию, прогнать LLM и обновить CRM. Для MVP хватит threading, но в проде этот слой лучше заменить очередью задач: при нескольких воркерах и перезапусках процесса такой фоновой поток не даёт надёжных ретраев и контроля состояния.
Полный app.py: ▼
Скрытый текстimport json import logging import threading from flask import Flask, jsonify, render_template, request from bitrix24_crm import update_crm_deal from config import Config from database import create_call_record, get_db_connection, init_db, update_call_state from exolve_api import get_call_transcription from yandex_llm import extract_bant_data app = Flask(__name__) app.config.from_object(Config) logger = logging.getLogger("App") with app.app_context(): init_db() def process_call_async(call_id, client_phone, audio_url): logger.info("[%s] Start pipeline", call_id) transcript = get_call_transcription(call_id, app.config) if not transcript: update_call_state(call_id, "ERROR") return update_call_state(call_id, "STT_OK", transcript=transcript) bant_result = extract_bant_data(transcript, app.config) if not bant_result: update_call_state(call_id, "ERROR") return bant_json = json.dumps(bant_result, ensure_ascii=False) update_call_state(call_id, "LLM_OK", bant_result=bant_json) if update_crm_deal(client_phone, bant_result, app.config): update_call_state(call_id, "CRM_OK") else: update_call_state(call_id, "ERROR") @app.route('/webhook/exolve', methods=['POST']) def handle_exolve_webhook(): if request.args.get('token') != app.config['WEBHOOK_SECRET']: return "Forbidden", 403 data = request.json or {} if data.get('event_type') != 'call.completed': return jsonify({"status": "ignored"}), 200 payload = data.get('payload', {}) call_id = payload.get('call_id') direction = payload.get('direction') client_phone = payload.get('from') if direction == 'inbound' else payload.get('to') audio_url = payload.get('recording_url') if not call_id or not client_phone or not audio_url: return "Bad Request", 400 if create_call_record(call_id, client_phone, audio_url): thread = threading.Thread( target=process_call_async, args=(call_id, client_phone, audio_url), ) thread.start() return jsonify({"status": "accepted"}), 202 return jsonify({"status": "already_processed"}), 200 @app.route('/', methods=['GET']) def dashboard(): conn = get_db_connection() calls = conn.execute( 'SELECT * FROM calls ORDER BY created_at DESC LIMIT 20' ).fetchall() conn.close() parsed_calls = [] for call in calls: row = dict(call) row['bant_data'] = json.loads(call['bant_result']) if call['bant_result'] else None parsed_calls.append(row) return render_template('index.html', calls=parsed_calls) if __name__ == '__main__': app.run(debug=True, port=5000)
У MVP три внешние точки отказа: МТС Exolve, YandexGPT и Bitrix24. Если запись в CRM упадёт по сети, нужно понять, на каком шаге оборвался пайплайн. Поэтому храним не только call_id, но и статус обработки.
Для такого сценария полезно сразу договориться о цепочке состояний: PENDING -> STT_OK -> LLM_OK -> CRM_OK или ERROR. Этого хватает, чтобы глазами понять, где завис звонок, и не лезть сразу в логи. call_id играет две роли: ключ идемпотентности и correlation id, по которому потом можно связать запись в базе, сообщения в логах и ответ внешнего API.
database.py
def init_db(): conn = get_db_connection() conn.execute( ''' CREATE TABLE IF NOT EXISTS calls ( call_id TEXT PRIMARY KEY, client_phone TEXT, audio_url TEXT, status TEXT DEFAULT 'PENDING', transcript TEXT, bant_result TEXT, created_at INTEGER ) ''' ) conn.commit() conn.close() def create_call_record(call_id, client_phone, audio_url): conn = get_db_connection() try: conn.execute( 'INSERT INTO calls (call_id, client_phone, audio_url, created_at) VALUES (?, ?, ?, ?)', (call_id, client_phone, audio_url, int(time.time())), ) conn.commit() return True except sqlite3.IntegrityError: return False finally: conn.close()
Этот слой принимает данные из вебхука и возвращает бинарный результат: запись создали или запись с таким call_id существует. Именно первичный ключ даёт идемпотентность. Для MVP SQLite удобна тем, что её можно поднять без миграций, но в проде такой журнал лучше перенести в Postgres.
Полный database.py: ▼
Скрытый текстimport sqlite3 import time from config import Config def get_db_connection(): conn = sqlite3.connect(Config.DB_NAME) conn.row_factory = sqlite3.Row return conn def init_db(): conn = get_db_connection() conn.execute( ''' CREATE TABLE IF NOT EXISTS calls ( call_id TEXT PRIMARY KEY, client_phone TEXT, audio_url TEXT, status TEXT DEFAULT 'PENDING', transcript TEXT, bant_result TEXT, created_at INTEGER ) ''' ) conn.commit() conn.close() def create_call_record(call_id, client_phone, audio_url): conn = get_db_connection() try: conn.execute( 'INSERT INTO calls (call_id, client_phone, audio_url, created_at) VALUES (?, ?, ?, ?)', (call_id, client_phone, audio_url, int(time.time())), ) conn.commit() return True except sqlite3.IntegrityError: return False finally: conn.close() def update_call_state(call_id, status, transcript=None, bant_result=None): conn = get_db_connection() query = "UPDATE calls SET status = ?" params = [status] if transcript: query += ", transcript = ?" params.append(transcript) if bant_result: query += ", bant_result = ?" params.append(bant_result) query += " WHERE call_id = ?" params.append(call_id) conn.execute(query, params) conn.commit() conn.close()
После вебхука нам нужен не аудиофайл сам по себе, а текст, который можно отдать модели. В этом сценарии используем Call Transcribation API МТС Exolve и не отправляем запись в отдельный сервис распознавания речи.
exolve_api.py
def get_call_transcription(call_id: str, config) -> str | None: url = f"https://api.exolve.ru/voice/v1/calls/{call_id}/transcription" headers = {"Authorization": f"Bearer {config.EXOLVE_API_KEY}"} for attempt in range(5): try: response = requests.get(url, headers=headers, timeout=10) if response.status_code in (404, 422): time.sleep(10) continue response.raise_for_status() messages = response.json().get("messages", []) if not messages: time.sleep(5) continue lines = [] for msg in messages: role = msg.get("role", "") text = msg.get("text", "").strip() if text: speaker = "Менеджер" if role == "OUTBOUND" else "Клиент" lines.append(f"{speaker}: {text}") return "\n".join(lines) except requests.RequestException: time.sleep(5) return None
На вход эта функция получает call_id, на выходе — собранный диалог одной строкой на реплику. Здесь мы не делаем один запрос и не падаем сразу, потому что транскрибация после завершения звонка может появиться не мгновенно. Поэтому polling с несколькими попытками практичнее, чем лишняя сложность в оркестрации.
Полный exolve_api.py: ▼
Скрытый текстimport logging import time import requests logger = logging.getLogger("ExolveAPI") def get_call_transcription(call_id: str, config) -> str | None: url = f"https://api.exolve.ru/voice/v1/calls/{call_id}/transcription" headers = {"Authorization": f"Bearer {config.EXOLVE_API_KEY}"} for attempt in range(5): try: logger.info("Request transcription %s, attempt %s", call_id, attempt + 1) response = requests.get(url, headers=headers, timeout=10) if response.status_code in (404, 422): time.sleep(10) continue response.raise_for_status() messages = response.json().get("messages", []) if not messages: time.sleep(5) continue lines = [] for msg in messages: role = msg.get("role", "") text = msg.get("text", "").strip() if not text: continue speaker = "Менеджер" if role == "OUTBOUND" else "Клиент" lines.append(f"{speaker}: {text}") final_text = "\n".join(lines) logger.info("Transcription length: %s", len(final_text)) return final_text except requests.RequestException as error: logger.error("Exolve request failed: %s", error) time.sleep(5) logger.error("Can not get transcription for %s", call_id) return None
Если попросить модель “проанализировать звонок”, она начнёт писать свободным текстом. Для CRM это бесполезно. Поэтому задаём модели жёсткий формат и сразу ограничиваем, что она может вернуть: need, need_description, budget_estimated, decision_maker, timeline, intent_score, competitors, objections.
yandex_llm.py
SYSTEM_PROMPT = """ Извлеки из транскрипта BANT+. Если данных нет, верни null, "unknown" или []. Верни только валидный JSON: { "need": true, "need_description": "Описание боли клиента", "budget_estimated": "150000-200000", "decision_maker": "unknown", "timeline": "30d", "intent_score": "med", "competitors": [], "objections": [] } """ def extract_bant_data(transcript: str, config) -> dict | None: if not transcript: return None payload = { "modelUri": config.MODEL_URI, "completionOptions": { "stream": False, "temperature": 0.1, "maxTokens": "1500", }, "messages": [ {"role": "system", "text": SYSTEM_PROMPT}, {"role": "user", "text": f"Транскрипция:\n{transcript[:15000]}"}, ], } for attempt in range(3): try: response = requests.post( config.YANDEX_GPT_URL, headers={ "Authorization": f"Api-Key {config.YANDEX_API_KEY}", "x-folder-id": config.YANDEX_FOLDER_ID, "Content-Type": "application/json", }, json=payload, timeout=20, ) response.raise_for_status() response_json = response.json() raw_text = response_json.get("result", {}).get("alternatives", [{}])[0].get("message", {}).get("text", "") if not raw_text: continue clean_text = raw_text.strip().removeprefix("```json").removesuffix("```").strip() return json.loads(clean_text) except (json.JSONDecodeError, requests.RequestException): if attempt == 2: return None
Эта функция получает текст разговора и возвращает готовую структуру для CRM. В этом фрагменте важны три вещи: temperature=0.1, жёсткая JSON-схема и очистка Markdown-артефактов. В таком режиме модель отвечает стабильнее, усечение транскрипта защищает от переполнения контекста, а результат можно сразу маппить в поля CRM.
Полный yandex_llm.py: ▼
Скрытый текстimport json import logging import time import requests logger = logging.getLogger("YandexLLM") SYSTEM_PROMPT = """ Ты анализируешь транскрипт B2B-звонка и извлекаешь BANT+. Правила: 1. Не придумывай факты. Если информации нет, верни null, "unknown" или []. 2. need: true или false. Если need=false, need_description=null. 3. budget_estimated: только формат "min-max" цифрами или "unknown". 4. decision_maker: "yes", "no", "unknown". 5. timeline: "ASAP", "30d", "90d", "unknown". 6. intent_score: "high", "med", "low". 7. competitors и objections заполняй только если они явно звучат в звонке. Верни только валидный JSON: { "need": true, "need_description": "Описание боли клиента (до 150 символов)", "budget_estimated": "150000-200000", "decision_maker": "unknown", "timeline": "30d", "intent_score": "med", "competitors": [], "objections": [] } """ def extract_bant_data(transcript: str, config) -> dict | None: if not transcript: return None safe_transcript = transcript[:15000] headers = { "Authorization": f"Api-Key {config.YANDEX_API_KEY}", "x-folder-id": config.YANDEX_FOLDER_ID, "Content-Type": "application/json", } payload = { "modelUri": config.MODEL_URI, "completionOptions": { "stream": False, "temperature": 0.1, "maxTokens": "1500", }, "messages": [ {"role": "system", "text": SYSTEM_PROMPT}, {"role": "user", "text": f"Транскрипция:\n{safe_transcript}"}, ], } for attempt in range(3): try: response = requests.post( config.YANDEX_GPT_URL, headers=headers, json=payload, timeout=20, ) if response.status_code == 429: time.sleep(2 ** attempt) continue response.raise_for_status() raw_text = response.json()['result']['alternatives'][0]['message']['text'] clean_text = raw_text.strip().removeprefix("```json").removesuffix("```").strip() return json.loads(clean_text) except (json.JSONDecodeError, requests.RequestException) as error: logger.error("LLM error, attempt %s: %s", attempt + 1, error) if attempt == 2: return None return None
После работы модели нам не нужен свободный текст. Нужен предсказуемый маппинг полей в CRM. Сделаем это через пользовательские поля сделки. Минимальный набор полей такой: описание потребности, бюджет, ЛПР, сроки, интерес, конкуренты и возражения.
bitrix24_crm.py
def normalize_phone(phone: str) -> str: digits = re.sub(r'\D', '', phone or '') if digits.startswith('8') and len(digits) == 11: digits = '7' + digits[1:] return digits def update_crm_deal(client_phone: str, bant_ dict, config) -> bool: clean_phone = normalize_phone(client_phone) search_url = f"{config.BITRIX24_WEBHOOK_URL}crm.deal.list.json" search_resp = requests.post( search_url, json={"filter": {"=CONTACT.PHONE": clean_phone}}, timeout=10, ) search_resp.raise_for_status() deals = search_resp.json().get("result", []) if not deals: return False payload = { "ID": deals[0]["ID"], "FIELDS": { "UF_CRM_BANT_NEED": bant_data.get("need_description") or "", "UF_CRM_BANT_BUDGET": bant_data.get("budget_estimated") or "", "UF_CRM_BANT_DM": bant_data.get("decision_maker") or "unknown", "UF_CRM_BANT_TIMELINE": bant_data.get("timeline") or "", "UF_CRM_INTENT": bant_data.get("intent_score", "low").upper(), "UF_CRM_COMPETITORS": ", ".join(bant_data.get("competitors", [])), "UF_CRM_OBJECTIONS": ", ".join(bant_data.get("objections", [])), }, } update_url = f"{config.BITRIX24_WEBHOOK_URL}crm.deal.update.json" response = requests.post(update_url, json=payload, timeout=10) response.raise_for_status() return "error" not in response.json()
На вход функции приходит номер клиента и JSON от модели, на выходе — факт успешного обновления сделки. Нормализация номера обязательна: без неё Bitrix24 часто не находит запись.
В текущем решении новая сделка не создаётся: сервис обновляет существующую, найденную по номеру телефона клиента.
Маппинг полей в этом примере прямой:
|
Поле BANT+ |
Поле сделки Bitrix24 |
|
need_description |
UF_CRM_BANT_NEED |
|
budget_estimated |
UF_CRM_BANT_BUDGET |
|
decision_maker |
UF_CRM_BANT_DM |
|
timeline |
UF_CRM_BANT_TIMELINE |
|
intent_score |
UF_CRM_INTENT |
|
competitors |
UF_CRM_COMPETITORS |
|
objections |
UF_CRM_OBJECTIONS |
Полный bitrix24_crm.py: ▼
Скрытый текстimport logging import re import requests logger = logging.getLogger("Bitrix24") def normalize_phone(phone: str) -> str: if not phone: return "" digits = re.sub(r'\D', '', phone) if digits.startswith('8') and len(digits) == 11: digits = '7' + digits[1:] return digits def update_crm_deal(client_phone: str, bant_ dict, config) -> bool: if not client_phone or not bant_ return False clean_phone = normalize_phone(client_phone) search_url = f"{config.BITRIX24_WEBHOOK_URL}crm.deal.list.json" try: search_resp = requests.post( search_url, json={"filter": {"=CONTACT.PHONE": clean_phone}}, timeout=10, ) search_resp.raise_for_status() deals = search_resp.json().get("result", []) if not deals: logger.warning("Deal not found for %s", clean_phone) return False except requests.RequestException as error: logger.error("Deal search failed: %s", error) return False payload = { "ID": deals[0]["ID"], "FIELDS": { "UF_CRM_BANT_NEED": bant_data.get("need_description") or "", "UF_CRM_BANT_BUDGET": bant_data.get("budget_estimated") or "", "UF_CRM_BANT_DM": bant_data.get("decision_maker") or "unknown", "UF_CRM_BANT_TIMELINE": bant_data.get("timeline") or "", "UF_CRM_INTENT": bant_data.get("intent_score", "low").upper(), "UF_CRM_COMPETITORS": ", ".join(bant_data.get("competitors", [])), "UF_CRM_OBJECTIONS": ", ".join(bant_data.get("objections", [])), }, } update_url = f"{config.BITRIX24_WEBHOOK_URL}crm.deal.update.json" try: response = requests.post(update_url, json=payload, timeout=10) response.raise_for_status() if "error" in response.json(): logger.error("Bitrix API error: %s", response.json().get("error_description")) return False return True except requests.RequestException as error: logger.error("Deal update failed: %s", error) return False
После сборки всех частей остаётся проверить, что событие доходит до приложения и проходит через весь пайплайн. Для локального теста достаточно поднять Flask и отправить тестовый вебхук на маршрут /webhook/exolve.
python app.py
Если приложение запущено локально, можно пробросить туннель через ngrok и отправить тестовый payload. В ответе ожидаем 202, а в журнале или в SQLite — новую запись со статусом PENDING, которая затем перейдёт в STT_OK, LLM_OK и CRM_OK.
Проверка выглядит так:
маршрут /webhook/exolve отвечает 202 Accepted на валидный call.completed;
в SQLite появляется запись с нужным call_id;
статус звонка проходит цепочку PENDING -> STT_OK -> LLM_OK -> CRM_OK;
в Bitrix24 обновляются пользовательские поля сделки, найденной по телефону;
в HTML-журнале видно и итоговый статус, и извлечённые поля BANT+.
Полный test_webhook.py: ▼
Скрытый текстimport requests NGROK_URL = "https://1234.ngrok-free.app/webhook/exolve?token=bant-super-secret-token" mock_payload = { "event_type": "call.completed", "payload": { "call_id": "test_local_001", "direction": "inbound", "from": "+79991234567", "to": "+74950000000", "recording_url": "https://api.exolve.ru/v1/recordings/test.mp3", }, } print("Отправляем тестовый звонок...") print("Статус:", requests.post(NGROK_URL, json=mock_payload).status_code)
Для быстрой проверки результата без захода в SQLite в проекте есть HTML-шаблон с последними записями.
Полный templates/index.html: ▼
Скрытый текст<!DOCTYPE html> <html> <head> <title>BANT Analyzer MVP</title> <style> body { font-family: sans-serif; background: #f0f2f5; padding: 20px; } .card { background: white; padding: 20px; border-radius: 8px; margin-bottom: 15px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } .badge { padding: 4px 8px; border-radius: 4px; font-weight: bold; font-size: 12px; } .CRM_OK { background: #d4edda; color: #155724; } .ERROR { background: #f8d7da; color: #721c24; } .PENDING, .STT_OK, .LLM_OK { background: #fff3cd; color: #856404; } </style> </head> <body> <h2>Журнал AI-анализа звонков</h2> {% for call in calls %} <div> <div> <strong>Звонок ID:</strong> {{ call.call_id }} | <strong>Телефон:</strong> {{ call.client_phone }} | <span>{{ call.status }}</span> </div> {% if call.bant_data %} <div> <div> <strong>Потребность:</strong> {{ call.bant_data.need_description or 'Не выявлено' }}<br> <strong>Бюджет:</strong> {{ call.bant_data.budget_estimated }}<br> <strong>Интерес:</strong> {{ call.bant_data.intent_score | upper }} </div> <div> <strong>ЛПР:</strong> {{ call.bant_data.decision_maker | upper }}<br> <strong>Сроки:</strong> {{ call.bant_data.timeline }}<br> <strong>Возражения:</strong> {{ call.bant_data.objections | join(', ') }} </div> </div> {% endif %} </div> {% endfor %} </body> </html>
У MVP есть несколько точек для следующего шага.
Фоновую обработку через threading лучше заменить очередью задач, чтобы контролировать ретраи и не терять задачи при перезапусках
SQLite стоит заменить на Postgres, если пайплайн будет работать под нагрузкой и с параллельной обработкой
Поиск сделки по CONTACT.PHONE стоит заменить на связку контакт -> активная сделка, если в CRM у контакта может быть несколько сделок
Проверку через json.loads стоит дополнить схемной валидацией, чтобы контролировать не только формат, но и допустимые значения полей
Локальные ретраи внутри модулей стоит вынести в общую механику переобработки, чтобы звонки не зависали в ERROR
Логи стоит расширить: сохранять call_id, шаг пайплайна, HTTP-статус и причину ошибки
Для хранения транскриптов стоит заранее определить политику по персональным данным и срокам хранения
Мы собрали MVP-сервис, который получает событие о завершённом звонке, забирает транскрибацию из МТС Exolve, извлекает BANT+ через YandexGPT и записывает результат в Bitrix24. Такой сценарий снижает потери на двух этапах: когда менеджер не собрал часть квалификации в разговоре и когда детали теряются при ручном заполнении CRM. Следующий логичный шаг — вынести фоновую обработку в очередь задач, добавить схемную валидацию ответа модели и усилить логику поиска сущностей в CRM, не меняя общий контракт пайплайна.
Код на гитхабе.
Источник


