Files
LJ360/mjpeg_streamer.py
2026-01-08 16:03:39 +08:00

411 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# mjpeg_streamer.pynano
import threading
import time
import os
import shutil
from datetime import datetime, timedelta
from flask import Flask, Response, render_template, request, redirect, session, url_for, send_from_directory
import cv2
import numpy as np
import subprocess
import stat
# ====== 新增:登录配置 ======
AUTO_LOGIN = True # 👈 设置为 True 可跳过登录
VALID_USER = {"username": "admin", "password": "admin"}
class MJPEGServer:
def __init__(self, frame_buffer, host="0.0.0.0", port=8080):
self.frame_buffer = frame_buffer
self.host = host
self.port = port
self.app = Flask(__name__)
self.app.secret_key = 'your-secret-key-change-in-prod' # 用于 session
# H264编码器参数
self.h264_encoder = None
self.h264_fourcc = None
self.h264_width = None
self.h264_height = None
# 视频录制配置
self.recording_enabled = True
self.segment_duration = 60 # 视频分段时长(秒)
self.storage_path = "/video_records" # 视频存储路径
self.max_retention_days = 30 # 最大保留天数
self.video_writer = None
self.current_segment_start = None
self.recording_thread = None
self.recording_stop_event = threading.Event()
# 确保存储目录存在
os.makedirs(self.storage_path, exist_ok=True)
os.chmod(self.storage_path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
# 路由
self.app.add_url_rule('/', 'index', self.index)
self.app.add_url_rule('/login', 'login', self.login, methods=['GET', 'POST'])
self.app.add_url_rule('/logout', 'logout', self.logout)
self.app.add_url_rule('/video_feed', 'video_feed', self.video_feed)
self.app.add_url_rule('/h264_feed', 'h264_feed', self.h264_feed)
self.app.add_url_rule('/api/videos', 'get_videos', self.get_videos)
self.app.add_url_rule('/api/video/<path:video_path>', 'serve_video', self.serve_video)
self.app.add_url_rule('/api/disk_usage', 'get_disk_usage', self.get_disk_usage)
# 静态文件自动托管Layui
self.app.static_folder = 'static'
def is_logged_in(self):
return session.get('logged_in', False)
def check_auth(self):
if AUTO_LOGIN:
session['logged_in'] = True
return True
return self.is_logged_in()
def index(self):
if not self.check_auth():
return redirect(url_for('login'))
return render_template('index.html')
def login(self):
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if username == VALID_USER['username'] and password == VALID_USER['password']:
session['logged_in'] = True
return redirect(url_for('index'))
else:
return '<script>alert("用户名或密码错误");window.history.back();</script>'
return render_template('login.html')
def logout(self):
session.pop('logged_in', None)
return redirect(url_for('login'))
def video_feed(self):
if not self.check_auth():
return '', 403
return Response(self._gen(),
mimetype='multipart/x-mixed-replace; boundary=frame')
def _gen(self):
while True:
success, frame = self.frame_buffer.get_frame()
if not success or frame is None:
time.sleep(0.1)
continue
ret, buffer = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 95])
if not ret:
continue
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n')
def h264_feed(self):
"""提供H264视频流"""
if not self.check_auth():
return '', 403
return Response(self._gen_h264_ffmpeg(),
mimetype='video/H264')
def _gen_h264_ffmpeg(self):
"""使用ffmpeg生成H264视频流"""
# 获取第一帧以确定视频参数
success, frame = self.frame_buffer.get_frame()
if not success or frame is None:
raise ValueError("无法获取视频帧")
height, width = frame.shape[:2]
fps = 25.0
# 构造ffmpeg命令
ffmpeg_cmd = [
'ffmpeg',
'-f', 'rawvideo',
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24',
'-s', f'{width}x{height}',
'-r', str(fps),
'-i', '-',
'-c:v', 'libx264',
'-preset', 'ultrafast',
'-tune', 'zerolatency',
'-b:v', '2000k',
'-f', 'h264',
'-'
]
# 启动ffmpeg进程
ffmpeg_process = subprocess.Popen(
ffmpeg_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
try:
while True:
# 获取新帧
success, frame = self.frame_buffer.get_frame()
if not success or frame is None:
time.sleep(0.01)
continue
# 确保帧尺寸一致
if frame.shape[1] != width or frame.shape[0] != height:
frame = cv2.resize(frame, (width, height))
# 写入ffmpeg标准输入
ffmpeg_process.stdin.write(frame.tobytes())
ffmpeg_process.stdin.flush()
# 读取编码后的H264数据
if ffmpeg_process.stdout.poll() is None:
h264_data = ffmpeg_process.stdout.read(4096)
if h264_data:
yield h264_data
finally:
# 清理资源
if ffmpeg_process.stdin:
ffmpeg_process.stdin.close()
if ffmpeg_process.stdout:
ffmpeg_process.stdout.close()
if ffmpeg_process.stderr:
ffmpeg_process.stderr.close()
ffmpeg_process.wait()
def _start_recording_thread(self):
"""启动视频录制线程"""
if not self.recording_thread or not self.recording_thread.is_alive():
self.recording_stop_event.clear()
self.recording_thread = threading.Thread(target=self._record_video, daemon=True)
self.recording_thread.start()
print(f"[RECORDING] 视频录制线程已启动,分段时长: {self.segment_duration}")
def _record_video(self):
"""视频录制主循环"""
while not self.recording_stop_event.is_set():
try:
success, frame = self.frame_buffer.get_frame()
if not success or frame is None:
time.sleep(0.1)
continue
# 检查是否需要创建新的视频分段
now = datetime.now()
if not self.video_writer or (now - self.current_segment_start).total_seconds() >= self.segment_duration:
self._create_new_segment(frame)
# 写入视频帧
if self.video_writer:
if self.video_writer == "ffmpeg":
# 使用FFmpeg写入帧
if hasattr(self, 'ffmpeg_process') and self.ffmpeg_process.stdin:
try:
self.ffmpeg_process.stdin.write(frame.tobytes())
self.ffmpeg_process.stdin.flush()
except BrokenPipeError:
print(f"[ERROR] FFmpeg进程已断开重新创建分段")
self._create_new_segment(frame)
else:
# 使用OpenCV写入帧
self.video_writer.write(frame)
# 定期清理旧视频
if now.second % 60 == 0: # 每分钟检查一次
self._clean_old_videos()
# 移除sleep让录制线程尽可能快地处理帧
except Exception as e:
print(f"[RECORDING ERROR] {e}")
time.sleep(1)
def _create_new_segment(self, frame):
"""创建新的视频分段文件"""
# 关闭当前视频写入器
if self.video_writer:
if self.video_writer == "ffmpeg":
# 关闭FFmpeg进程
if hasattr(self, 'ffmpeg_process'):
try:
if self.ffmpeg_process.stdin:
self.ffmpeg_process.stdin.close()
if self.ffmpeg_process.stdout:
self.ffmpeg_process.stdout.close()
if self.ffmpeg_process.stderr:
self.ffmpeg_process.stderr.close()
self.ffmpeg_process.wait(timeout=5)
except Exception as e:
print(f"[ERROR] 关闭FFmpeg进程失败: {e}")
self.ffmpeg_process.terminate()
self.ffmpeg_process.wait(timeout=2)
else:
# 关闭OpenCV VideoWriter
self.video_writer.release()
self.video_writer = None
# 创建日期目录
now = datetime.now()
self.current_segment_start = now
date_dir = os.path.join(self.storage_path, now.strftime("%Y-%m-%d"))
os.makedirs(date_dir, exist_ok=True)
os.chmod(date_dir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
# 生成视频文件名
timestamp = now.strftime("%H-%M-%S")
video_path = os.path.join(date_dir, f"video_{timestamp}.mp4")
# 创建新的视频写入器
height, width = frame.shape[:2]
fps = 15.0
# 使用FFmpeg通过subprocess创建视频录制进程
try:
# 构建FFmpeg命令
self.ffmpeg_cmd = [
'ffmpeg',
'-y', # 覆盖现有文件
'-f', 'rawvideo', # 输入格式为原始视频
'-vcodec', 'rawvideo',
'-pix_fmt', 'bgr24', # OpenCV默认的BGR格式
'-s', f'{width}x{height}', # 视频分辨率
'-r', str(fps), # 帧率
'-i', '-', # 从标准输入读取
'-c:v', 'libx264', # 使用H.264编码器
'-preset', 'ultrafast', # 编码速度优先
'-tune', 'zerolatency', # 低延迟
'-b:v', '2M', # 比特率
'-f', 'mp4', # 输出格式为MP4
video_path
]
# 启动FFmpeg进程
self.ffmpeg_process = subprocess.Popen(
self.ffmpeg_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self.video_writer = "ffmpeg" # 标记为使用FFmpeg
print(f"[RECORDING] 使用FFmpeg开始录制新分段: {video_path}")
except Exception as e:
print(f"[ERROR] FFmpeg录制失败: {e}")
# 降级使用OpenCV VideoWriter
fourcc = cv2.VideoWriter_fourcc(*"MJPG")
self.video_writer = cv2.VideoWriter(video_path + '.avi', fourcc, fps, (width, height))
print(f"[RECORDING] 降级使用OpenCV开始录制新分段: {video_path}.avi")
def _clean_old_videos(self):
"""清理超过保留期限的旧视频"""
try:
cutoff_date = datetime.now() - timedelta(days=self.max_retention_days)
cutoff_str = cutoff_date.strftime("%Y-%m-%d")
# 遍历所有日期目录
for date_dir in os.listdir(self.storage_path):
if date_dir < cutoff_str:
dir_path = os.path.join(self.storage_path, date_dir)
if os.path.isdir(dir_path):
shutil.rmtree(dir_path)
print(f"[CLEANUP] 删除过期视频目录: {dir_path}")
except Exception as e:
print(f"[CLEANUP ERROR] {e}")
def get_videos(self):
"""API: 获取视频文件列表"""
if not self.check_auth():
return {'error': 'Unauthorized'}, 403
videos = []
try:
# 获取所有日期目录
date_dirs = sorted([d for d in os.listdir(self.storage_path) if os.path.isdir(os.path.join(self.storage_path, d))], reverse=True)
for date_dir in date_dirs:
date_path = os.path.join(self.storage_path, date_dir)
video_files = sorted([f for f in os.listdir(date_path) if f.endswith('.avi') or f.endswith('.mp4')])
for video_file in video_files:
video_path = os.path.join(date_dir, video_file)
full_path = os.path.join(self.storage_path, video_path)
size = os.path.getsize(full_path) / (1024 * 1024) # MB
mtime = os.path.getmtime(full_path)
# 根据文件扩展名获取时间
if video_file.endswith('.avi'):
time_str = video_file.split('_')[1].replace('.avi', '')
else:
time_str = video_file.split('_')[1].replace('.mp4', '')
videos.append({
'path': video_path,
'date': date_dir,
'time': time_str,
'size': round(size, 2),
'mtime': mtime
})
except Exception as e:
print(f"[API ERROR] 获取视频列表失败: {e}")
return {'error': 'Failed to get videos'}, 500
return {'videos': videos}
def serve_video(self, video_path):
"""API: 提供视频文件下载/播放"""
if not self.check_auth():
return {'error': 'Unauthorized'}, 403
try:
# 确保路径安全,防止目录遍历
full_path = os.path.abspath(os.path.join(self.storage_path, video_path))
if not full_path.startswith(os.path.abspath(self.storage_path)):
return {'error': 'Invalid path'}, 400
directory = os.path.dirname(full_path)
filename = os.path.basename(full_path)
# 根据文件扩展名设置正确的MIME类型
if filename.endswith('.avi'):
mimetype = 'video/x-msvideo'
elif filename.endswith('.mp4'):
mimetype = 'video/mp4'
else:
mimetype = 'video/mp4' # 默认使用mp4的MIME类型
return send_from_directory(directory, filename, mimetype=mimetype)
except Exception as e:
print(f"[API ERROR] 提供视频文件失败: {e}")
return {'error': 'Failed to serve video'}, 500
def get_disk_usage(self):
"""API: 获取磁盘使用情况"""
if not self.check_auth():
return {'error': 'Unauthorized'}, 403
try:
statvfs = os.statvfs(self.storage_path)
total = statvfs.f_frsize * statvfs.f_blocks / (1024 * 1024 * 1024) # GB
used = statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree) / (1024 * 1024 * 1024) # GB
free = statvfs.f_frsize * statvfs.f_bfree / (1024 * 1024 * 1024) # GB
usage = (used / total) * 100 if total > 0 else 0
return {
'total': round(total, 2),
'used': round(used, 2),
'free': round(free, 2),
'usage_percent': round(usage, 1)
}
except Exception as e:
print(f"[API ERROR] 获取磁盘使用情况失败: {e}")
return {'error': 'Failed to get disk usage'}, 500
def start(self):
# 启动视频录制
self._start_recording_thread()
# 启动Web服务器
thread = threading.Thread(
target=self.app.run,
kwargs={'host': self.host, 'port': self.port, 'debug': False, 'use_reloader': False},
daemon=True
)
thread.start()
print(f"[MJPEG] Web 系统已启动,访问 http://{self.host}:{self.port}")