Yoda AI Gelişmiş Teknik Dökümantasyon
Bu dökümantasyon, ozelzeka.com platformunun arka planında çalışan yüksek performanslı, güvenli ve ölçeklenebilir yapay zeka mimarisini ve ERP entegrasyon katmanlarını detaylandırmaktadır.
Gemini 2.5 Pro/Flash
Google Cloud Vertex AI üzerinden en son nesil dil modelleri ile akıllı analiz ve Text-to-SQL üretimi.
Qdrant & PostgreSQL
Semantik arama (RAG) için Qdrant vektör veritabanı ve ilişkisel analizler için PostgreSQL hibrit mimarisi.
Celery & Redis
Asenkron görevler, CDC veri akışları ve milisaniyeler seviyesinde semantik önbellekleme.
1. Veri Akışı ve Qdrant Otomasyonu (CDC & Celery)
PostgreSQL veritabanına yeni bir metin verisi (müşteri notu, sözleşme açıklaması vb.) eklendiğinde, insan müdahalesi olmadan otomatik olarak vektörleştirilip Qdrant'a eklenmesini sağlayan asenkron CDC (Change Data Capture) mimarisi.
PostgreSQL Trigger Yapısı
CREATE OR REPLACE FUNCTION notify_new_text_insert()
RETURNS TRIGGER AS $$
DECLARE
payload JSON;
BEGIN
payload = json_build_object(
'id', NEW.id,
'table_name', TG_TABLE_NAME,
'text_column', TG_ARGV[0]
);
PERFORM pg_notify('qdrant_cdc_channel', payload::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
Celery Görevi (Python)
@celery_app.task(name="index_new_text_to_qdrant", bind=True, max_retries=5)
def index_new_text_to_qdrant(self, record_id: int, table_name: str, text_column: str):
# Güvenlik Beyaz Listesi (SQL Injection Koruması)
ALLOWED_TABLES = {"customer_notes", "contract_details"}
if table_name not in ALLOWED_TABLES:
raise ValueError("Güvenlik İhlali!")
...
vector = embeddings_model.embed_query(text_content)
qdrant_client.upsert(collection_name="yoda_semantic_notes", points=[...])
2. Agentic Routing (Çoklu Ajan Yönlendirmesi)
Kullanıcıdan gelen promptu analiz edip, sorunun sadece Qdrant'a mı (metin/açıklama araması), sadece PostgreSQL'e mi (Text-to-SQL üzerinden bakiye/sayısal analiz) yoksa her ikisine birden gidip sonuçların LLM tarafından birleştirileceğine mi karar veren LangGraph Agent yapısı.
LangGraph Akış Tanımı
workflow = StateGraph(AgentState)
workflow.add_node("router", router_node)
workflow.add_node("qdrant_search", qdrant_node)
workflow.add_node("postgres_query", postgres_node)
workflow.add_node("synthesizer", synthesizer_node)
workflow.set_entry_point("router")
workflow.add_conditional_edges("router", route_decision, {
"qdrant": "qdrant_search",
"postgres": "postgres_query",
"hybrid": "synthesizer"
})
3. Redis Semantic Caching
Aynı anlama gelen soruların LLM'e gitmeden Redis önbelleğinden doğrudan yanıtlanması için LangChain Redis Semantic Cache entegrasyonu. Bu sayede maliyetler düşürülür ve yanıt süreleri milisaniyeler seviyesine indirilir.
Semantik Önbellek Yapılandırması
from langchain_community.cache import RedisSemanticCache
from langchain_google_vertexai import VertexAIEmbeddings
import langchain
set_llm_cache(RedisSemanticCache(
redis_url="redis://localhost:6379/0",
embedding=VertexAIEmbeddings(model_name="text-multilingual-embedding-002"),
score_threshold=0.12 # 0.88 ve üzeri benzerlikler önbellekten döner
))
4. Row-Level Security (Plasiyer Yetkilendirme)
PostgreSQL'de her plasiyerin yalnızca kendi müşterilerine ait verileri görebilmesini garanti eden RLS (Row-Level Security) SQL politikaları ve Python tarafında dinamik rol yönetimi.
PostgreSQL RLS Politikası
ALTER TABLE customers ENABLE ROW LEVEL SECURITY;
CREATE POLICY customer_plasiyer_isolation_policy ON customers
USING (plasiyer_id = NULLIF(current_setting('app.current_plasiyer_id', true), '')::integer);
Python Dinamik Bağlantı Yönetimi
class RLSDatabaseManager:
def get_connection(self, plasiyer_id: int):
conn = psycopg2.connect(self.db_url)
with conn.cursor() as cursor:
# Oturum bazlı plasiyer ID'sini güvenli bir şekilde set et
cursor.execute("SET LOCAL app.current_plasiyer_id = %s", (plasiyer_id,))
return conn
5. Maliyet İzleme (Cost Tracking) ve Gözlemlenebilirlik
Vertex AI üzerinden atılan her istekte (Gemini 2.5) kullanılan input ve output token sayılarını yakalayan, maliyeti hesaplayan ve PostgreSQL'e kaydeden Callback Handler yapısı.
Maliyet Hesaplama Formülü
- Gemini 2.5 Pro: Input: $1.25 / 1M Token | Output: $10.00 / 1M Token
- Gemini 2.5 Flash: Input: $0.30 / 1M Token | Output: $2.50 / 1M Token
Callback Handler Sınıfı
class VertexAICostTrackingCallbackHandler(BaseCallbackHandler):
def on_llm_end(self, response: LLMResult, **kwargs):
token_usage = response.llm_output.get("token_usage", {})
input_tokens = token_usage.get("prompt_tokens", 0)
output_tokens = token_usage.get("completion_tokens", 0)
# Maliyet hesaplama ve PostgreSQL 'llm_usage_logs' tablosuna kaydetme
...
6. ERP Entegrasyon Mimarisi
Netsis, Logo, Mikro, SAP gibi farklı ERP sistemlerinden güvenli, artımlı (incremental) ve dinamik veri çekme, şema eşleştirme (AI Schema Matcher) ve senkronizasyon altyapısı.
Çoklu Bağlayıcı Katmanı
SQL tabanlı (MSSQL, PostgreSQL) ve REST API tabanlı ERP sistemleri için ortak soyutlama katmanı.
AI Şema Eşleştirici
Gemini 2.5 kullanarak bilinmeyen ERP tablolarını otomatik analiz eder ve Yoda şemasına eşler.
Artımlı Senkronizasyon Görevi (Celery)
@celery_app.task(name="sync_erp_entity", bind=True, max_retries=3)
def sync_erp_entity(self, erp_name: str, entity_name: str, query_or_endpoint: str, target_table: str, last_sync_key: str):
sync_manager = SyncManager(PG_DB_URL)
connector = sync_manager.get_connector(erp_name)
# Sadece son senkronizasyondan sonra değişen verileri çek
raw_records = connector.fetch_incremental(query_or_endpoint, last_sync_key, last_sync_value)
# Verileri hedef tabloya güvenli bir şekilde UPSERT et
sync_manager.write_to_target(target_table, mapped_records)
PostgreSQL ERP Şema Tanımları
-- ERP Sistemleri Tanım Tablosu
CREATE TABLE IF NOT EXISTS erp_systems (
id SERIAL PRIMARY KEY,
name VARCHAR(100) UNIQUE NOT NULL, -- Netsis, Logo, Mikro, SAP vb.
connector_type VARCHAR(50) NOT NULL, -- sql, rest
config JSONB NOT NULL, -- Bağlantı bilgileri (şifreli veya güvenli saklanmalı)
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Senkronizasyon Durum (State) Tablosu
CREATE TABLE IF NOT EXISTS erp_sync_states (
id SERIAL PRIMARY KEY,
erp_id INTEGER REFERENCES erp_systems(id) ON DELETE CASCADE,
entity_name VARCHAR(100) NOT NULL, -- customers, orders, invoices vb.
last_sync_key VARCHAR(100) NOT NULL, -- update_date, last_id vb.
last_sync_value VARCHAR(255) NOT NULL, -- Son senkronize edilen değer
status VARCHAR(50) DEFAULT 'idle', -- running, idle, failed
last_error TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(erp_id, entity_name)
);
Güvenli Parametrik UPSERT Yazıcı (SyncManager)
# Güvenli Hedef Tablo Beyaz Listesi (Whitelist)
ALLOWED_TARGET_TABLES = {"customers", "orders", "invoices", "products"}
if target_table not in ALLOWED_TARGET_TABLES:
raise ValueError(f"Güvenlik İhlali: Geçersiz hedef tablo: {target_table}")
# Sütun adlarının sadece harf, rakam ve alt çizgiden oluştuğunu doğrula (SQL Injection koruması)
for col in columns:
if not col.replace("_", "").isalnum():
raise ValueError(f"Güvenlik İhlali: Geçersiz sütun adı: {col}")
# Güvenli parametrik UPSERT sorgusu
query = f"""
INSERT INTO {target_table} ({col_names})
VALUES ({col_placeholders})
ON CONFLICT (id) DO UPDATE SET {update_clause};
"""