This guide provides a step-by-step approach to developing a working prototype of an AI-powered surveillance system using YOLOv8 for real-time threat detection.
- Project Overview
- Environment Setup
- Data Collection and Preparation
- Model Selection and Training
- Building the Detection System
- Alert System Implementation
- Storage and Logging
- Testing and Optimization
- Deployment
- Resources and References
This academic project aims to develop an AI-based surveillance system that enhances security through real-time object detection and threat analysis using YOLOv8. The system will process video streams, detect potential threats, and alert security personnel when suspicious activities are identified.
- Real-time object detection using YOLOv8
- Detection of specific threats (weapons, suspicious objects, etc.)
- Automated alert system
- Video recording and storage for evidence
- Dashboard for monitoring and analysis
# Create a virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Create project structure
mkdir -p code/{models,utils,logs}
mkdir -p code/data/{raw,processed}
mkdir -p config
touch code/main.py
touch code/utils/camera.py
touch code/utils/detector.py
touch code/utils/alerts.pyai-surveillance/
├── code/
│ ├── data/
│ │ ├── raw/
│ │ └── processed/
│ ├── models/
│ ├── utils/
│ │ ├── camera.py
│ │ ├── detector.py
│ │ └── alerts.py
│ ├── main.py
│ ├── train_model.py
│ └── evaluate_model.py
├── config/
│ ├── config.yaml # Main configuration file
│ ├── dataset.yaml # Dataset configuration
│ └── settings.py # Settings management module
├── logs/
└── recordings/
pip install ultralytics # YOLOv8 library
pip install opencv-python # Computer vision library
pip install numpy pandas matplotlib # Data processing and visualization
pip install Flask # For web dashboard (optional)
pip install requests # For API communications
pip install python-dotenv # For environment variables
pip install pyyaml # For configuration filesThe project uses a pre-organized dataset available at Google Drive Dataset. The dataset is structured as follows:
dataset/
├── test/ # Test dataset
├── train/ # Training dataset
└── validation/ # Validation datasetTo prepare the dataset:
# Create dataset directory
mkdir -p dataset
# Download the dataset
# Using gdown for Google Drive downloads
pip install gdown
# Download and extract the dataset (you'll need to get the direct file IDs from the Google Drive link)
cd dataset
gdown --folder https://drive.google.com/drive/folders/1q6uYxRqtkl0vvT9_O3jOcwR7UMTIrpeA
# Verify the dataset structure
ls -RThe system uses a centralized configuration approach with all configuration files located in the config/ directory:
config.yaml: Main system configurationdataset.yaml: Dataset configurationsettings.py: Configuration management module
To run the system with custom configuration:
python code/main.py --config config/config.yamlTo train the model:
python code/train_model.py --data config/dataset.yamlTo evaluate the model:
python code/evaluate_model.py --data config/dataset.yaml# utils/detector.py
from ultralytics import YOLO
def load_model(model_path='yolov8n.pt'):
"""
Load YOLOv8 model
Args:
model_path: Path to pre-trained or custom YOLOv8 model
Returns:
Loaded YOLO model
"""
return YOLO(model_path)
def detect_objects(model, image, conf_threshold=0.25):
"""
Detect objects in an image
Args:
model: YOLOv8 model
image: Input image
conf_threshold: Confidence threshold for detections
Returns:
Detection results
"""
results = model.predict(image, conf=conf_threshold)
return results[0]# train_model.py
from ultralytics import YOLO
# Load a pre-trained model
model = YOLO('yolov8n.pt') # YOLOv8 nano model
# Train the model on your custom dataset
results = model.train(
data='dataset.yaml',
epochs=100,
imgsz=640,
batch=16,
name='surveillance_model'
)
# Export model for inference
model.export(format='onnx') # Export to ONNX format for cross-platform compatibility# utils/camera.py
import cv2
import time
class Camera:
def __init__(self, source=0):
"""
Initialize camera source
Args:
source: Camera index or video file path
"""
self.source = source
self.cap = None
def start(self):
"""Start capturing from the camera"""
self.cap = cv2.VideoCapture(self.source)
if not self.cap.isOpened():
raise ValueError(f"Unable to open video source {self.source}")
return self.cap.isOpened()
def read_frame(self):
"""Read a frame from the camera"""
if self.cap is None or not self.cap.isOpened():
self.start()
ret, frame = self.cap.read()
if not ret:
return None
return frame
def release(self):
"""Release the camera resource"""
if self.cap is not None:
self.cap.release()
self.cap = None# main.py
import cv2
import time
import os
from utils.camera import Camera
from utils.detector import load_model, detect_objects
from utils.alerts import AlertSystem
def main():
# Initialize components
camera = Camera(source=0) # Use webcam (change to IP camera URL or video file as needed)
model = load_model('models/yolov8n.pt') # Use pre-trained model or your custom model
alert_system = AlertSystem()
# Start detection loop
try:
camera.start()
print("Surveillance system started. Press 'q' to quit.")
while True:
# Read frame
frame = camera.read_frame()
if frame is None:
print("Failed to grab frame")
break
# Detect objects
results = detect_objects(model, frame)
# Process results
boxes = results.boxes
for box in boxes:
x1, y1, x2, y2 = map(int, box.xyxy[0])
conf = float(box.conf)
cls = int(box.cls)
class_name = results.names[cls]
# Draw bounding box
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, f"{class_name} {conf:.2f}", (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
# Check for threats
if class_name in ['weapon', 'suspicious_package', 'masked_face']:
alert_system.trigger_alert(
threat_type=class_name,
confidence=conf,
image=frame
)
# Display the frame
cv2.imshow('AI Surveillance', frame)
# Break the loop on 'q' press
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
# Clean up
camera.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
main()# utils/alerts.py
import os
import time
import cv2
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from dotenv import load_dotenv
class AlertSystem:
def __init__(self, cooldown=60):
"""
Initialize alert system
Args:
cooldown: Cooldown period between alerts (seconds)
"""
load_dotenv()
self.last_alert_time = {}
self.cooldown = cooldown
self.email_sender = os.getenv('EMAIL_SENDER')
self.email_password = os.getenv('EMAIL_PASSWORD')
self.email_receiver = os.getenv('EMAIL_RECEIVER')
self.log_dir = os.path.join('logs', 'alerts')
os.makedirs(self.log_dir, exist_ok=True)
def trigger_alert(self, threat_type, confidence, image=None):
"""
Trigger an alert based on detected threat
Args:
threat_type: Type of threat detected
confidence: Confidence score
image: Image showing the threat
"""
current_time = time.time()
# Check if we're still in cooldown period for this threat type
if threat_type in self.last_alert_time:
time_since_last = current_time - self.last_alert_time[threat_type]
if time_since_last < self.cooldown:
return
# Update last alert time
self.last_alert_time[threat_type] = current_time
# Log the alert
alert_msg = f"ALERT: {threat_type} detected with confidence {confidence:.2f}"
print(alert_msg)
# Save the image
if image is not None:
timestamp = time.strftime("%Y%m%d-%H%M%S")
img_path = os.path.join(self.log_dir, f"{threat_type}_{timestamp}.jpg")
cv2.imwrite(img_path, image)
# Send email alert
self._send_email_alert(threat_type, confidence, img_path if image is not None else None)
def _send_email_alert(self, threat_type, confidence, image_path=None):
"""Send an email alert with the detected threat"""
if not all([self.email_sender, self.email_password, self.email_receiver]):
print("Email credentials not set. Skipping email alert.")
return
try:
msg = MIMEMultipart()
msg['From'] = self.email_sender
msg['To'] = self.email_receiver
msg['Subject'] = f"SECURITY ALERT: {threat_type.upper()} Detected!"
body = f"""
Security Alert:
A {threat_type} has been detected with confidence {confidence:.2f}.
Timestamp: {time.strftime("%Y-%m-%d %H:%M:%S")}
Please check the surveillance system immediately.
"""
msg.attach(MIMEText(body, 'plain'))
# Attach image if available
if image_path and os.path.exists(image_path):
with open(image_path, 'rb') as img_file:
img = MIMEImage(img_file.read())
img.add_header('Content-Disposition', 'attachment', filename=os.path.basename(image_path))
msg.attach(img)
# Send email
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login(self.email_sender, self.email_password)
server.send_message(msg)
print(f"Email alert sent to {self.email_receiver}")
except Exception as e:
print(f"Failed to send email alert: {str(e)}")This project uses Mailgun for email alerts and Twilio for SMS alerts. You'll need to create accounts with both services and obtain API keys to enable these features.
- Create a Mailgun account: Go to the Mailgun website and sign up for an account. You can choose the "pay-as-you-go" plan, which includes a free trial period.
- Obtain your API key: After signing up, navigate to your Mailgun dashboard. Find your API key (usually under "Settings" or "API Keys").
- Find your domain name: Mailgun uses domains for sending emails. You'll need to add and verify a domain (or use a sandbox domain for testing). Find your domain name in your Mailgun dashboard.
- Determine your sender email address: This is the email address that alerts will be sent from. It should be associated with your Mailgun domain.
- Create a Twilio account: Go to the Twilio website and sign up for a free trial account.
- Obtain your Account SID and Auth Token: After signing up, you'll find your Account SID and Auth Token on your Twilio dashboard.
- Get a Twilio phone number: You'll need a Twilio phone number to send SMS messages. You can get a trial number as part of your free trial.
- Determine the receiver phone number: This is the phone number where alerts will be sent.
If you want to receive alerts via WhatsApp, you'll need to configure Twilio for WhatsApp messaging:
- Set up a WhatsApp Business Profile: Follow the instructions in the Twilio documentation to connect your Twilio account to a WhatsApp Business Profile.
- Get your WhatsApp sender number: This will be a Twilio phone number that's enabled for WhatsApp. You can find this in your Twilio console. It will typically be your Twilio phone number, but you'll use it with the
whatsapp:prefix (e.g.,whatsapp:+14155238886). - Determine your WhatsApp receiver number: This is the number where you want to receive WhatsApp alerts. It must also include the
whatsapp:prefix (e.g.,whatsapp:+1234567890).
Once you have the necessary information from Mailgun and Twilio, populate the .env file in the project root directory with the following values:
MAILGUN_API_KEY=your_mailgun_api_key
MAILGUN_DOMAIN=your_mailgun_domain
MAILGUN_SENDER=your_sender_email_address
TWILIO_ACCOUNT_SID=your_twilio_account_sid
TWILIO_AUTH_TOKEN=your_twilio_auth_token
TWILIO_PHONE_NUMBER=your_twilio_phone_number
SMS_RECEIVER_NUMBER=your_receiver_phone_number
EMAIL_RECEIVER=your_email_receiver
Replace the placeholders (e.g., your_mailgun_api_key) with your actual credentials.
# utils/recorder.py
import cv2
import os
import time
from datetime import datetime
class VideoRecorder:
def __init__(self, output_dir='recordings', fps=20, resolution=(640, 480)):
"""
Initialize video recorder
Args:
output_dir: Directory to save recordings
fps: Frames per second
resolution: Video resolution (width, height)
"""
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
self.fps = fps
self.resolution = resolution
self.writer = None
self.recording = False
def start_recording(self):
"""Start a new recording"""
if self.recording:
self.stop_recording()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = os.path.join(self.output_dir, f"surveillance_{timestamp}.mp4")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self.writer = cv2.VideoWriter(output_path, fourcc, self.fps, self.resolution)
self.recording = True
print(f"Started recording: {output_path}")
return output_path
def write_frame(self, frame):
"""Write a frame to the video"""
if self.recording and self.writer:
frame_resized = cv2.resize(frame, self.resolution)
self.writer.write(frame_resized)
def stop_recording(self):
"""Stop the current recording"""
if self.recording and self.writer:
self.writer.release()
self.writer = None
self.recording = False
print("Recording stopped")# config/settings.py
import os
import yaml
class Settings:
def __init__(self, config_path='config/config.yaml'):
"""
Load configuration from YAML file
Args:
config_path: Path to config file
"""
self.config_path = config_path
self.config = {}
self.load_config()
def load_config(self):
"""Load configuration from file"""
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
self.config = yaml.safe_load(f)
else:
# Default configuration
self.config = {
'camera': {
'source': 0,
'width': 640,
'height': 480,
'fps': 30
},
'model': {
'path': 'yolov8n.pt',
'conf_threshold': 0.25,
'iou_threshold': 0.45
},
'alerts': {
'cooldown': 60,
'email_enabled': False
},
'recording': {
'enabled': True,
'trigger_on_detection': True,
'clip_duration': 30, # seconds
'storage_limit': 10 # GB
},
'display': {
'show_window': True,
'draw_boxes': True,
'draw_labels': True
}
}
# Save default config
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
with open(self.config_path, 'w') as f:
yaml.dump(self.config, f, default_flow_style=False)
def get(self, section, key=None):
"""Get configuration value"""
if key is None:
return self.config.get(section, {})
return self.config.get(section, {}).get(key)# test_system.py
import cv2
import time
from utils.camera import Camera
from utils.detector import load_model, detect_objects
def test_detection_speed():
"""Test the detection speed on sample frames"""
model = load_model()
camera = Camera(0)
camera.start()
frame_count = 0
start_time = time.time()
duration = 10 # Test for 10 seconds
print("Testing detection speed...")
while time.time() - start_time < duration:
frame = camera.read_frame()
if frame is None:
break
# Run detection
results = detect_objects(model, frame)
# Process and display results
processed_frame = results.plot()
cv2.imshow('Test Detection', processed_frame)
frame_count += 1
if cv2.waitKey(1) & 0xFF == ord('q'):
break
elapsed_time = time.time() - start_time
fps = frame_count / elapsed_time
print(f"Average FPS: {fps:.2f}")
print(f"Frame processing time: {1000/fps:.2f} ms")
camera.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
test_detection_speed()# Optimize YOLOv8 model to increase speed (add to main.py)
def optimize_model(model_path, format='onnx'):
"""
Optimize the model for faster inference
Args:
model_path: Path to the YOLOv8 model
format: Export format (onnx, torchscript, etc.)
Returns:
Path to optimized model
"""
from ultralytics import YOLO
model = YOLO(model_path)
optimized_path = model.export(format=format)
print(f"Model optimized and exported to: {optimized_path}")
return optimized_path# deploy.py
import os
import argparse
import subprocess
import yaml
def setup_environment():
"""Set up the environment for deployment"""
# Install required packages
subprocess.run(['pip', 'install', '-r', 'requirements.txt'])
# Create necessary directories
os.makedirs('logs', exist_ok=True)
os.makedirs('recordings', exist_ok=True)
os.makedirs('models', exist_ok=True)
# Download YOLOv8 model if not present
if not os.path.exists('models/yolov8n.pt'):
subprocess.run(['python', '-c',
'from ultralytics import YOLO; YOLO("yolov8n.pt").save("models/yolov8n.pt")'])
def create_service_file(service_name, working_dir, user):
"""Create a systemd service file for autostart"""
service_content = f"""[Unit]
Description=AI Surveillance System
After=network.target
[Service]
Type=simple
User={user}
WorkingDirectory={working_dir}
ExecStart=/usr/bin/python3 {working_dir}/main.py
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
"""
with open(f"{service_name}.service", 'w') as f:
f.write(service_content)
print(f"Created service file: {service_name}.service")
print("To install the service:")
print(f"sudo cp {service_name}.service /etc/systemd/system/")
print("sudo systemctl daemon-reload")
print(f"sudo systemctl enable {service_name}")
print(f"sudo systemctl start {service_name}")
def main():
parser = argparse.ArgumentParser(description='Deploy AI Surveillance System')
parser.add_argument('--create-service', action='store_true', help='Create systemd service file')
parser.add_argument('--service-name', default='ai-surveillance', help='Name of the service')
parser.add_argument('--user', default=os.getenv('USER'), help='User to run the service')
args = parser.parse_args()
# Setup environment
setup_environment()
# Create service file if requested
if args.create_service:
working_dir = os.path.abspath('.')
create_service_file(args.service_name, working_dir, args.user)
print("Deployment setup complete!")
if __name__ == "__main__":
main()# requirements.txt
ultralytics==8.0.147
opencv-python==4.7.0.72
numpy==1.24.3
pandas==2.0.1
matplotlib==3.7.1
Flask==2.3.2
requests==2.30.0
python-dotenv==1.0.0
pyyaml==6.0
-
Ultralytics YOLOv8 - https://github.com/ultralytics/ultralytics
- The primary library for object detection, providing pre-trained models and training capabilities
-
OpenCV - https://github.com/opencv/opencv-python
- For camera integration, video processing, and basic computer vision operations
-
Flask/FastAPI - For creating web dashboards or APIs
- Flask: https://github.com/pallets/flask
- FastAPI: https://github.com/tiangolo/fastapi
-
onnxruntime - https://github.com/microsoft/onnxruntime
- For optimized inference on various platforms
- Ultralytics YOLOv8 Documentation
- OpenCV Documentation
- PyTorch Documentation
- Raspberry Pi Documentation (for edge deployment)
After completing the prototype:
- Performance Optimization: Fine-tune model parameters for your specific hardware
- Custom Training: Train models on your specific threat scenarios
- Edge Deployment: Deploy on Raspberry Pi or other edge devices
- UI Development: Create a user-friendly dashboard for monitoring
- Integration: Connect with existing security systems
- Scale Up: Expand to multiple cameras and locations