- AI-Powered Severity Analysis: Classifies logs using BERT.
- Anomaly Detection: Identifies unusual patterns with KMeans clustering.
- Self-Healing Automation: Executes scripts to mitigate critical issues.
- Slack Integration: Sends real-time notifications for critical logs.
- Prometheus Monitoring: Tracks bugs and RCA metrics.
import logging
import os
import json
import pandas as pd
import numpy as np
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
from transformers import AutoTokenizer, TFAutoModel
from tensorflow.keras.models import Model
from sklearn.cluster import KMeans
from causalinference import CausalModel
from prometheus_client import start_http_server, Counter
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import subprocess
import requests
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
bug_detected_counter = Counter("bug_detected", "Number of bugs detected in logs")
root_cause_counter = Counter("root_cause_analysis", "Number of root causes identified")
KAFKA_BROKER = os.getenv("KAFKA_BROKER", "localhost:9092")
TOPIC = os.getenv("KAFKA_TOPIC", "application_logs")
consumer = KafkaConsumer(
TOPIC,
bootstrap_servers=[KAFKA_BROKER],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
ELASTIC_HOST = os.getenv("ELASTIC_HOST", "http://localhost:9200")
es = Elasticsearch([ELASTIC_HOST])
SLACK_TOKEN = os.getenv("SLACK_TOKEN")
SLACK_CHANNEL = os.getenv("SLACK_CHANNEL", "#bug-notifications")
slack_client = WebClient(token=SLACK_TOKEN)
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
bert_model = TFAutoModel.from_pretrained("bert-base-uncased")
def root_cause_analysis(data):
logging.info("Performing Root Cause Analysis...")
causal = CausalModel(
Y=data["severity_label"].values,
D=data["source_system"].values, # Adjust column names as per your logs
X=data.drop(columns=["severity_label", "source_system"]).values
)
causal.est_via_matching()
root_cause_counter.inc()
return causal.estimates
def cluster_anomalies(logs):
logging.info("Clustering logs for anomalies...")
kmeans = KMeans(n_clusters=3, random_state=42)
clusters = kmeans.fit_predict(logs)
anomalies = logs[clusters == 2] # Example: Cluster 2 as anomalies
return anomalies
Notification System
def notify_slack(message):
try:
slack_client.chat_postMessage(channel=SLACK_CHANNEL, text=message)
except SlackApiError as e:
logging.error(f"Slack API error: {e.response['error']}")
def run_self_healing_script(script_path):
logging.info(f"Running self-healing script: {script_path}")
try:
subprocess.run(["bash", script_path], check=True)
except subprocess.CalledProcessError as e:
logging.error(f"Error executing self-healing script: {e}")
def process_log(log):
try:
message = log['message']
severity = log['severity']
source = log['source']
# Tokenize for Model Inference
tokens = tokenizer([message], padding=True, truncation=True, max_length=128, return_tensors="tf")
embeddings = bert_model.predict({"input_ids": tokens["input_ids"], "attention_mask": tokens["attention_mask"]})
severity_label = np.argmax(embeddings[0][0])
es.index(index="bug_logs", body={
"message": message,
"severity": severity,
"severity_label": int(severity_label),
"source": source
})
# Notification & Healing
if severity_label >= 2: # ERROR or CRITICAL
notify_slack(f"Bug Detected: {message}, Severity: {severity_label}")
run_self_healing_script("/path/to/healing_script.sh") # Customize path
bug_detected_counter.inc()
except Exception as e:
logging.error(f"Error processing log: {e}")
def process_logs_stream():
logging.info("Starting log stream processing...")
for message in consumer:
log = message.value
process_log(log)
def generate_reports():
logging.info("Generating advanced reports...")
logs = es.search(index="bug_logs", body={"query": {"match_all": {}}}, size=10000)
data = pd.DataFrame([log['_source'] for log in logs['hits']['hits']])
anomaly_logs = cluster_anomalies(data)
anomaly_logs.to_csv("anomaly_logs.csv", index=False)
if name == "main":
# Start Prometheus Monitoring
start_http_server(8000)
# Process Logs from Kafka
process_logs_stream()
Top comments (0)