本文旨在实现无人机城市交通智慧巡检中的一个模块——无人机视频实时推拉流以及识别流并在前端展示,同时,统计目标数量以及违停数量,生成结果评估,一并发送到前端展示。对于本文任何技术上的空缺,可在博主主页前面博客寻找,有任何问题欢迎私信或评论区讨论!!!
涉及技术栈:
Django5+vue3+websocket+SRS+FFmpeg+RTMP+YOLOv8+AI模型+异步+多进程+flv.js+node.js
web端推拉流测试、抽帧识别计数,一键式生成巡检报告
项目结构(Django):
├── DJI_yolo
│   ├── __init__.py
│   ├── __pycache__
│   ├── asgi.py
│   ├── settings.py
│   ├── templates
│   ├── urls.py
│   └── wsgi.py
├── app01
│   ├── __init__.py
│   ├── __pycache__
│   ├── admin.py
│   ├── apps.py
│   ├── car_best.pt
│   ├── consumers
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   ├── detection_consumer.py
│   │   └── report_consumer.py
│   ├── migrations
│   │   ├── __init__.py
│   │   └── __pycache__
│   ├── models.py
│   ├── pictures
│   ├── routings.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   └── detection_engine.py
│   ├── stream_status.py
│   ├── tests.py
│   ├── utils.py
│   └── views.py
├── manage.py
├── media
│   ├── 2025-06-02
│   │   ├── 20时14分54秒-20时17分03秒
│   │   │   ├── 关键帧图片
│   │   │   ├── 原视频
│   │   │   └── 识别视频
│   │   ├── 20时26分11秒-20时29分00秒
│   │   │   ├── 关键帧图片
│   │   │   ├── 原视频
│   │   │   └── 识别视频
存在的问题,亟需解决:
1、后端逐帧识别并返回前端,前端显示卡顿,考虑跳帧识别,一秒识别一帧,剩余原始帧直接返回。
2、曾尝试过使用 yolo 中的 model 类中的 track 方法进行跨帧追踪,为每帧每个目标进行编号(id),提取帧中目标特征(速度、目标面积、唯位移等等),进行特征化工程,从而判断相邻两识别帧中的某两目标是否为同一目标。由于添加追踪算法,单帧识别时间过长,单帧识别时间大于拉流帧传输时间,导致进程堵塞,程序崩溃。后续采取措施改进。
3、模型并未做违停检测,考虑重新训练模型,添加违停训练集,重新训练,顺便搭配 yolo12 环境,用 yolo12 进行训练。
4、Django 后端接受到流,并不能直接开始识别,而是先用四五秒时间加载模型,需要优化。
5、执行任务、完成任务到数据存储、前端显示巡检报告,延迟较高,需要优化。
6、为后端添加车辆运动状态算法,若连续识别的两个帧同一目标静止,并且识别为违停,则判断为违停;若非静止但识别违停,不做处理。相对静止判断?无人机在动,无法建立参考系。
7、进行车牌识别。
代码及粗略解释:
detection_consumer.py:
此处代码使用 FFmpeg 拉取 SRS 上的流,并使用 detection_engine 中的类进行帧识别和计数,然后判断计数,调用讯飞星火 API 生成 AI 答复,获取关键帧,将所有信息封装成一个字典,并传给前端,以用于违停报告生成。同时,将实时识别帧通过 websocket 的消费者传给前端显示。
- 
import asyncio
 - 
import base64
 - 
import os
 - 
import subprocess
 - 
import time
 - 
import traceback
 - 
from datetime import datetime
 - 
from pathlib import Path
 - 
from .report_consumer import ReportConsumer
 - 
import numpy as np
 - 
import cv2
 - 
from channels.generic.websocket import AsyncWebsocketConsumer
 - 
from channels.layers import get_channel_layer
 - 
from app01.services.detection_engine import DetectionEngine, logger
 - 
from concurrent.futures import ThreadPoolExecutor
 - 
from app01.utils import rename_time_folder # 导入重命名函数
 - 
from app01.stream_status import stream_status, status_lock # 从新模块导入
 - 
import requests
 - 
 - 
RTMP_URL = “rtmp://127.0.0.1:1935/live/stream”
 - 
channel_layer = get_channel_layer()
 - 
 - 
class DetectionStreamConsumer(AsyncWebsocketConsumer):
 - 
async def connect(self):
 - 
await self.channel_layer.group_add(“detection_group”, self.channel_name)
 - 
await self.accept()
 - 
print(“WebSocket 连接建立成功”)
 - 
 - 
async def disconnect(self, code):
 - 
await self.channel_layer.group_discard(“detection_group”, self.channel_name)
 - 
print(“WebSocket 连接关闭”)
 - 
 - 
async def start_detection(self):
 - 
print(“开始检测帧流…”)
 - 
await self.monitor_stream_status()
 - 
 - 
async def monitor_stream_status(self):
 - 
while True:
 - 
if stream_status.is_streaming:
 - 
logger.info(“检测到推流,开始处理…”)
 - 
await self.start_detection_stream()
 - 
else:
 - 
logger.info(“当前无推流,等待中…”)
 - 
await asyncio.sleep(1)
 - 
 - 
async def start_detection_stream(self, all_dict=None):
 - 
ffmpeg_pull_command = [
 - 
“ffmpeg”,
 - 
“-c:v”, “h264”,
 - 
“-i”, RTMP_URL,
 - 
“-f”, “rawvideo”,
 - 
“-pix_fmt”, “bgr24”,
 - 
“-s”, “640×360”,
 - 
“-r”, “5”,
 - 
“-vcodec”, “rawvideo”,
 - 
“-an”,
 - 
“-flags”, “+low_delay”,
 - 
“-tune”, “zerolatency”,
 - 
“-“
 - 
]
 - 
 - 
try:
 - 
process = subprocess.Popen(
 - 
ffmpeg_pull_command,
 - 
stdout=subprocess.PIPE,
 - 
stderr=subprocess.DEVNULL,
 - 
bufsize=10 * 8
 - 
)
 - 
except Exception as e:
 - 
logger.error(f”FFmpeg 子进程启动失败: {e}“)
 - 
return
 - 
 - 
if process.stdout is None:
 - 
logger.error(“FFmpeg stdout 为空”)
 - 
return
 - 
 - 
frame_width, frame_height = 640, 360
 - 
frame_size = frame_width * frame_height * 3
 - 
executor = ThreadPoolExecutor(max_workers=4) # 最多可以同时运行 4 个线程
 - 
engine = DetectionEngine()
 - 
frame_count = 0
 - 
skip_interval = 1
 - 
 - 
try:
 - 
while stream_status.is_streaming:
 - 
try:
 - 
loop = asyncio.get_event_loop()
 - 
raw_frame = await asyncio.wait_for(
 - 
loop.run_in_executor(executor, process.stdout.read, frame_size),
 - 
timeout=8 # 超时检测断流
 - 
)
 - 
except asyncio.TimeoutError:
 - 
logger.warning(“读取帧超时,触发流结束”)
 - 
with status_lock:
 - 
stream_status.is_streaming = False
 - 
break
 - 
 - 
if len(raw_frame) != frame_size:
 - 
continue
 - 
 - 
try:
 - 
frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((frame_height, frame_width, 3))
 - 
except Exception as e:
 - 
logger.warning(f”帧解析失败: {e}“)
 - 
continue
 - 
 - 
frame_count += 1
 - 
if frame_count % skip_interval != 0:
 - 
continue
 - 
 - 
result = await loop.run_in_executor(executor, engine.process_frame, frame)
 - 
# 增加空值检查
 - 
if result is None:
 - 
logger.warning(“检测结果为空,跳过处理”)
 - 
continue
 - 
processed_frame, detected_classes = result
 - 
 - 
# 更新车辆统计(仅按类别)
 - 
with DetectionEngine.counter_lock:
 - 
for class_id in detected_classes:
 - 
if class_id == 0:
 - 
DetectionEngine.total_count[‘car’] += 1
 - 
DetectionEngine.total_count[‘total’] += 1
 - 
elif class_id == 1:
 - 
DetectionEngine.total_count[‘bus’] += 1
 - 
DetectionEngine.total_count[‘total’] += 1
 - 
elif class_id == 2:
 - 
DetectionEngine.total_count[‘truck’] += 1
 - 
DetectionEngine.total_count[‘total’] += 1
 - 
elif class_id == 3:
 - 
DetectionEngine.total_count[‘van’] += 1
 - 
DetectionEngine.total_count[‘total’] += 1
 - 
 - 
_, jpeg = cv2.imencode(‘.jpg’, processed_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50])
 - 
if channel_layer:
 - 
await channel_layer.group_send(
 - 
“detection_group”,
 - 
{“type”: “send_frame”, “frame”: jpeg.tobytes()}
 - 
)
 - 
except Exception as e:
 - 
logger.error(f”检测处理错误: {e}“)
 - 
logger.error(traceback.format_exc())
 - 
finally:
 - 
logger.warning(“流结束,进入处理…”)
 - 
if process.returncode is None:
 - 
process.kill()
 - 
logger.warning(“FFmpeg 进程已终止”)
 - 
 - 
# 延迟一小段时间,确保文件操作完全释放
 - 
time.sleep(1)
 - 
 - 
stream_status.is_streaming = False
 - 
stream_status.end_datetime = datetime.now() # 记录结束时间
 - 
 - 
logger.warning(“正在获取三个关键帧…”)
 - 
# 找3个关键帧,必须在重命名前执行
 - 
folder = Path(f”{stream_status.image_path}“)
 - 
files = [f for f in folder.iterdir() if f.is_file()]
 - 
if not files:
 - 
return None, None, None
 - 
# 按修改时间排序
 - 
files.sort(key=lambda x: x.stat().st_mtime)
 - 
first = files[0]
 - 
last = files[-1]
 - 
middle = files[len(files) // 2]
 - 
 - 
# 转换为Base64
 - 
stream_status.image_li = [
 - 
self.image_to_base64(str(f)) # 传入字符串路径,避免Path对象序列化问题
 - 
for f in [first, middle, last]
 - 
]
 - 
 - 
logger.warning(“正在重命名文件夹…”)
 - 
# 先复制 time_path,因为后续可能被修改
 - 
time_path = stream_status.time_path
 - 
start_datetime = stream_status.start_datetime
 - 
end_datetime = stream_status.end_datetime
 - 
save_path = “”
 - 
# 调用重命名函数(使用完整 datetime)
 - 
if time_path and start_datetime:
 - 
 - 
path = rename_time_folder(
 - 
time_path,
 - 
start_datetime, # 传入开始时间
 - 
end_datetime # 传入结束时间
 - 
)
 - 
save_path = path
 - 
logger.warning(
 - 
f”文件夹已重命名为 {start_datetime.strftime(‘%H时%M分%S秒’)}–{end_datetime.strftime(‘%H时%M分%S秒’)}“)
 - 
 - 
try:
 - 
logger.warning(“正在更新数据…”)
 - 
# 更新识别结果
 - 
stats = self.correct_overcounted_stats(DetectionEngine.total_count)
 - 
print(“stats:”, stats)
 - 
stream_status.total_stats = stats
 - 
print(“stream_status.total_stats:”, stream_status.total_stats)
 - 
self.save_vehicle_stats(stats, save_path)
 - 
 - 
except Exception as e:
 - 
logger.error(f”保存统计信息失败: {e}“)
 - 
print(“正在获取AI答复…”)
 - 
self.AI_response()
 - 
 - 
# 定义总数据,并序列化
 - 
all_dict = {
 - 
‘datetime’: stream_status.start_time.strftime(“%Y-%m-%d %H:%M:%S.%f”), # 序列化,转为字符串
 - 
‘image_li’: [bs for bs in stream_status.image_li], # 关键帧转二进制
 - 
‘detect_car’: [_ for _ in stream_status.total_stats.values()],
 - 
‘Al_response’: stream_status.AI_talk,
 - 
}
 - 
 - 
print(“all_dict:”, all_dict)
 - 
print(“正在发送信息…”)
 - 
await ReportConsumer.send_report_data(all_dict)
 - 
 - 
executor.shutdown(wait=False)
 - 
DetectionEngine.reset_stats()
 - 
stream_status.reset_all_dict()
 - 
 - 
def save_vehicle_stats(self, stats, save_path=None):
 - 
# 如果未提供路径,使用默认的 time_path
 - 
if not save_path:
 - 
save_path = stream_status.time_path
 - 
 - 
if not save_path:
 - 
logger.warning(“无法保存统计信息:路径为空”)
 - 
return
 - 
 - 
stats_path = os.path.join(save_path, ‘vehicle_stats.txt’)
 - 
try:
 - 
with open(stats_path, ‘w’, encoding=‘utf-8-sig’) as f:
 - 
f.write(“车辆统计结果\n”)
 - 
f.write(“————————-\n”)
 - 
for k, v in stats.items():
 - 
f.write(f”{k}: {v}\n”)
 - 
f.write(“————————-\n”)
 - 
logger.info(f”统计信息已保存至 {stats_path}“)
 - 
except Exception as e:
 - 
logger.error(f”写入统计信息失败: {e}“)
 - 
 - 
 - 
def AI_response(self):
 - 
“””获取AI回复”””
 - 
url = “https://spark-api-open.xf-yun.com/v1/chat/completions”
 - 
headers = {
 - 
“Authorization”: “Bearer pDLDvpUbFDTEQZACQSjD:CqZewebAdgpuvxrVbbAv”,
 - 
“Content-Type”: “application/json”,
 - 
}
 - 
content = (
 - 
f’我现在要做一个无人机城市交通智慧巡检报告,现在巡检结果是:巡检时间:{stream_status.start_time}‘
 - 
f’小汽车有{stream_status.total_stats[“car”]}辆,面包车有{stream_status.total_stats[“van”]}辆,’
 - 
f’公交车有{stream_status.total_stats[“bus”]}辆,卡车有{stream_status.total_stats[“truck”]}辆,’
 - 
f’识别到的违停车辆共有{stream_status.total_stats[“illegally_car”]}辆。’
 - 
f’帮我生成一段结论,要求不要有废话,也不要写具体那四个类别识别到的车辆数,字数200字,语言精炼严谨’
 - 
)
 - 
 - 
question = {
 - 
“role”: “user”,
 - 
“content”: content
 - 
}
 - 
 - 
data = {
 - 
“max_tokens”: 4096,
 - 
“top_k”: 4,
 - 
“messages”: [question],
 - 
“temperature”: 0.5,
 - 
“model”: “4.0Ultra”,
 - 
“stream”: False,
 - 
}
 - 
response = requests.post(url, headers=headers, json=data)
 - 
response.raise_for_status()
 - 
result = response.json()
 - 
 - 
# 提取模型输出的内容
 - 
reply = result.get(‘choices’, [{}])[0].get(‘message’, {}).get(‘content’, ”).strip()
 - 
print(reply)
 - 
stream_status.AI_talk = reply
 - 
 - 
# 读取图片文件并转换为 Base64
 - 
# 静态方法不应接收self参数
 - 
def image_to_base64(image_path):
 - 
with open(image_path, “rb”) as img_file:
 - 
base64_str = base64.b64encode(img_file.read()).decode(“utf-8”)
 - 
return f”data:image/jpeg;base64,{base64_str}“
 - 
 - 
# 去掉self第一个参数
 - 
def correct_overcounted_stats(stats, avg_appearance=6):
 - 
“””
 - 
简单修正严重重复统计的结果
 - 
“””
 - 
corrected = {}
 - 
for cls in [‘car’, ‘van’, ‘bus’, ‘truck’]:
 - 
if cls in stats:
 - 
corrected[cls] = max(0, int(round(stats[cls] / avg_appearance)))
 - 
 - 
# 可选:重新计算 total
 - 
corrected[‘total’] = sum(corrected.values())
 - 
corrected[‘illegally_car’] = int(corrected[‘total’] / 100)
 - 
print(corrected)
 - 
 - 
return corrected
 - 
 - 
async def send_frame(self, event):
 - 
frame_bytes = event[“frame”]
 - 
await self.send(bytes_data=frame_bytes)
stream_status.py:
本文件用于 Django 项目全局状态管理,相当于 vue3 中的 pinia , is_streaming 是前端推本地视频、后端上传 SRS 后,变为 True ,后端因为 is_streaming = True 而开始拉流,当超过 6 秒未拉到流,判断流结束,is_streaming = False。还定义了全局状态锁、状态重置方法、时间获取方法、文件夹名获取方法、状态实例。全局状态锁是为了防止多线程修改同一个共享变量,只有with status_lock:在 with 块内,只有当前线程持有锁时才能运行代码,其他线程必须等待,相当于在此处把锁”锁“起来了。
- 
import threading
 - 
 - 
# 全局状态锁和状态对象
 - 
status_lock = threading.Lock()
 - 
class StreamStatus:
 - 
 - 
def __init__(self):
 - 
self.is_streaming = False # 是否正在推流/检测
 - 
self.start_time = None # 推流开始时间(datetime对象)
 - 
self.end_time = None # 推流结束时间(datetime对象)
 - 
self.time_path = None # 当前时间文件夹路径(如 Media/2025-06-01/12点)
 - 
self.total_stats = { # 车辆统计
 - 
“car”: 0,
 - 
“van”: 0,
 - 
“bus”: 0,
 - 
“truck”: 0,
 - 
“total”: 0,
 - 
“illegally_car”: 0,
 - 
}
 - 
self.last_frame_time = None # 最后收到有效帧的时间(用于断流检测)
 - 
self.image_path = None
 - 
self.image_li = []
 - 
self.AI_talk = None
 - 
self.all_dict = {}
 - 
 - 
 - 
def date_folder(self):
 - 
“””获取日期文件夹名称(如 2025-06-01)”””
 - 
if self.start_time:
 - 
return self.start_time.strftime(“%Y-%m-%d”)
 - 
return None
 - 
 - 
 - 
def start_hour(self):
 - 
“””获取开始小时(如 12)”””
 - 
if self.start_time:
 - 
return self.start_time.hour
 - 
return None
 - 
 - 
def reset(self):
 - 
“””重置状态(用于新任务开始)”””
 - 
self.is_streaming = False
 - 
self.start_time = None
 - 
self.end_time = None
 - 
self.time_path = None
 - 
self.total_stats = {k: 0 for k in self.total_stats}
 - 
self.last_frame_time = None
 - 
 - 
def reset_all_dict(self):
 - 
self.last_frame_time = None # 最后收到有效帧的时间(用于断流检测)
 - 
self.image_path = None
 - 
self.image_li = []
 - 
self.AI_talk = None
 - 
self.all_dict = {}
 - 
self.all_dict = {}
 - 
# 全局唯一的状态实例
 - 
stream_status = StreamStatus()
 
python
detection_engine.py:
此处是 detection_consumer.py 使用的服务代码块,用于为其提供帧识别服务,consumer 拉到的每一个流都会调用该类中的 process_frame 类方法,并同时进行计数和关键帧保存
- 
import threading
 - 
import time
 - 
import cv2
 - 
from ultralytics import YOLO
 - 
import logging
 - 
import os
 - 
from app01.stream_status import stream_status, status_lock # 从新模块导入
 - 
 - 
logger = logging.getLogger(__name__)
 - 
 - 
MODEL_PATH = “F:\\FullStack\\Django\\DJI_yolo\\app01\\car_best.pt”
 - 
model = YOLO(MODEL_PATH)
 - 
 - 
# 关键帧存储位置
 - 
pictures_dir = “pictures”
 - 
if not os.path.exists(pictures_dir):
 - 
os.makedirs(pictures_dir)
 - 
 - 
 - 
class DetectionEngine:
 - 
frame_counter = 1
 - 
counter_lock = threading.Lock() # 线程锁,保证计数器安全
 - 
 - 
# 新增:全局计数器和已统计 ID
 - 
total_count = {
 - 
‘car’: 0,
 - 
‘van’: 0,
 - 
‘bus’: 0,
 - 
‘truck’: 0,
 - 
“total”: 0,
 - 
“illegally_car”: 0
 - 
}
 - 
# 下次任务重置
 - 
 - 
def reset_stats(cls):
 - 
with cls.counter_lock:
 - 
for k in cls.total_count:
 - 
cls.total_count[k] = 0
 - 
 - 
# 下次任务重置
 - 
 - 
def update_stats(cls, vehicle_type):
 - 
with cls.counter_lock:
 - 
if vehicle_type in cls.total_count:
 - 
cls.total_count[vehicle_type] += 1
 - 
cls.total_count[“total”] += 1
 - 
 - 
def __init__(self):
 - 
self.model = model
 - 
# 调整检测参数,减少计算量
 - 
self.detect_params = {
 - 
‘conf’: 0.3, # 提高置信度阈值,减少无效检测
 - 
‘iou’: 0.5, # 调整 NMS 阈值
 - 
‘imgsz’: [384, 640], # 输入尺寸,降低分辨率
 - 
‘classes’: [0, 1, 2, 3], # 仅检测车辆类别(2: car, 5: bus, 7: truck)
 - 
‘device’: 0, # 使用 GPU
 - 
‘half’: True, # 使用 FP16 精度
 - 
‘show’: False # 关闭实时显示,减少开销
 - 
}
 - 
 - 
def process_frame(self, frame):
 - 
“””处理单帧图像:YOLO推理 + 保存结果到关键帧图片文件夹”””
 - 
results = model(frame)
 - 
detected_objects = []
 - 
annotated_frame = frame.copy() # 复制原始帧以防止修改原图
 - 
# 防御性检查:确保 results 非空且包含有效数据
 - 
if not results or len(results) == 0:
 - 
logger.warning(“未获取到检测结果,跳过处理”)
 - 
return frame, []
 - 
 - 
boxes = results[0].boxes
 - 
if boxes is None or len(boxes) == 0:
 - 
logger.warning(“检测结果为空,跳过处理”)
 - 
return frame, []
 - 
try:
 - 
# 假设类别信息存储在 cls 属性中
 - 
classes = boxes.cls.int().cpu().tolist() if hasattr(boxes, ‘cls’) and boxes.cls is not None else []
 - 
 - 
for class_id in classes:
 - 
detected_objects.append(class_id)
 - 
 - 
annotated_frame = results[0].plot() # 绘制检测框
 - 
except Exception as e:
 - 
logger.error(f”解析检测数据失败: {e}“)
 - 
return annotated_frame, []
 - 
with status_lock:
 - 
car_count = sum(1 for cls in detected_objects if cls == 0)
 - 
if car_count > 18:
 - 
try:
 - 
timestamp = int(time.time())
 - 
save_path = os.path.join(stream_status.image_path, f”keyframe_{timestamp}.jpg”)
 - 
cv2.imwrite(save_path, annotated_frame)
 - 
logger.info(f”关键帧保存成功: {save_path}“)
 - 
except Exception as e:
 - 
logger.error(f”保存关键帧失败: {e}“)
 - 
 - 
return annotated_frame, detected_objects
 
repoet_consumer.py:
用于将前面 detection_consumer.py 中的字典数据发送给前端,首先会将数据缓存到后端,等到前端消费者建立成功,立马发送出去。
- 
import asyncio
 - 
 - 
from channels.generic.websocket import AsyncWebsocketConsumer
 - 
from channels.layers import get_channel_layer
 - 
import json
 - 
 - 
# 缓存队列:用于保存尚未发送的数据
 - 
queue = []
 - 
# 记录是否有消费者连接
 - 
has_consumer = False
 - 
 - 
class ReportConsumer(AsyncWebsocketConsumer):
 - 
async def connect(self):
 - 
global has_consumer
 - 
# 加入指定组
 - 
await self.channel_layer.group_add(“report_group”, self.channel_name)
 - 
await self.accept()
 - 
print(“巡检报告WebSocket 已连接”)
 - 
# 设置有消费者连接的标志
 - 
has_consumer = True
 - 
# 尝试发送缓存数据
 - 
await flush_queue(self.channel_layer)
 - 
 - 
async def disconnect(self, close_code):
 - 
global has_consumer
 - 
# 移除组成员
 - 
await self.channel_layer.group_discard(“report_group”, self.channel_name)
 - 
print(“巡检报告WebSocket 断开连接”)
 - 
# 重置消费者连接标志
 - 
has_consumer = False
 - 
 - 
async def receive(self, text_data=None, bytes_data=None):
 - 
pass
 - 
 - 
async def send_report(self, event):
 - 
“””处理 send_report 类型的消息并发送给客户端”””
 - 
data = event[‘data’]
 - 
print(“【发送到客户端】”, data)
 - 
 - 
# 转换为JSON字符串发送
 - 
try:
 - 
# 处理可能包含的非JSON可序列化对象
 - 
# 这里假设 image_li 包含 Path 对象,需要转换为字符串
 - 
if ‘image_li’ in data:
 - 
data[‘image_li’] = [str(path) for path in data[‘image_li’]]
 - 
 - 
await self.send(text_data=json.dumps(data))
 - 
except Exception as e:
 - 
print(f”【发送到客户端失败】{e}“)
 - 
 - 
 - 
async def send_report_data(cls, data):
 - 
“””供其他模块调用的方法”””
 - 
global queue, has_consumer
 - 
# 不再检查是否有人连接,直接发送或缓存,生产模式用Redis
 - 
channel_layer = get_channel_layer()
 - 
 - 
# 如果没有消费者连接,将数据加入队列
 - 
if not has_consumer:
 - 
queue.append(data)
 - 
print(f”【数据已缓存】等待消费者连接: {data}“)
 - 
else:
 - 
# 有消费者连接,直接发送
 - 
await send_report(channel_layer, data)
 - 
 - 
 - 
# ========== 全局函数 ==========
 - 
# 定期检查并发送缓存数据的任务
 - 
async def queue_check_task():
 - 
channel_layer = get_channel_layer()
 - 
while True:
 - 
await asyncio.sleep(1) # 每秒检查一次
 - 
if queue and has_consumer:
 - 
await flush_queue(channel_layer)
 - 
# 在应用启动时启动队列检查任务
 - 
async def start_queue_check():
 - 
asyncio.create_task(queue_check_task())
 - 
async def send_report(channel_layer, data):
 - 
try:
 - 
await channel_layer.group_send(
 - 
“report_group”,
 - 
{
 - 
“type”: “send_report”,
 - 
“data”: data,
 - 
},
 - 
)
 - 
print(“【发送成功】”, data)
 - 
except Exception as e:
 - 
print(f”【发送失败】{e}“)
 - 
 - 
 - 
async def flush_queue(channel_layer):
 - 
global queue
 - 
if not queue:
 - 
return
 - 
 - 
items = list(queue)
 - 
queue.clear()
 - 
for data in items:
 - 
await send_report(channel_layer, data)
 
routings.py:
用于定义 websocket 的后端接收路径,并连接消费者
- 
from django.urls import re_path
 - 
from app01.consumers import detection_consumer, report_consumer
 - 
 - 
websocket_urlpatterns = [
 - 
re_path(r’^ws/detection/$’, detection_consumer.DetectionStreamConsumer.as_asgi()),
 - 
re_path(r’^ws/report/$’, report_consumer.ReportConsumer.as_asgi()),
 - 
]
 
urls.py:
用于接收前端发送的 http 请求,并与 view.py 中的视图函数建立联系
- 
“””
 - 
URL configuration for DJI_yolo project.
 - 
 - 
The `urlpatterns` list routes URLs to views. For more information please see:
 - 
https://docs.djangoproject.com/en/5.1/topics/http/urls/
 - 
Examples:
 - 
Function views
 - 
1. Add an import: from my_app import views
 - 
2. Add a URL to urlpatterns: path(”, views.home, name=’home’)
 - 
Class-based views
 - 
1. Add an import: from other_app.views import Home
 - 
2. Add a URL to urlpatterns: path(”, Home.as_view(), name=’home’)
 - 
Including another URLconf
 - 
1. Import the include() function: from django.urls import include, path
 - 
2. Add a URL to urlpatqterns: path(‘blog/’, include(‘blog.urls’))
 - 
“””
 - 
from django.conf.urls.static import static
 - 
from django.contrib import admin
 - 
from django.urls import path
 - 
 - 
from DJI_yolo import settings
 - 
from app01 import views
 - 
urlpatterns = [
 - 
path(“admin/”, admin.site.urls),
 - 
path(‘csrf/’, views.get_csrf_token, name=‘get_csrf_token’),
 - 
# 推流测试
 - 
path(‘push_stream/’, views.push_stream, name=‘push_stream’),
 - 
 - 
# 媒体库管理
 - 
path(‘api4/date-folders/’, views.get_date_folders, name=‘date-folders’),
 - 
path(‘api4/time-folders/<str:date>/’, views.get_time_folders, name=‘time-folders’),
 - 
path(‘api4/files/<str:date>/<str:time_range>/<str:category>/count/’, views.get_file_count, name=‘file-count’),
 - 
path(‘api4/files/<str:date>/<str:time_range>/<str:category>/’, views.get_files, name=‘files’),
 - 
path(‘api4/delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>/’, views.delete_file,
 - 
name=‘delete-file’),
 - 
path(‘delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>’, views.delete_file,
 - 
name=‘delete-file’), # 添加一个用于删除文件的 URL 路径
 - 
 - 
] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
 
views.py:
用于处理前端请求,这里报告推流、媒体管理系统、原始视频存储逻辑等等。
- 
# views.py
 - 
from datetime import datetime
 - 
from pathlib import Path
 - 
import urllib.parse
 - 
from django.middleware.csrf import get_token
 - 
import os
 - 
import subprocess
 - 
import threading
 - 
from django.http import JsonResponse
 - 
from django.conf import settings
 - 
import asyncio
 - 
from .consumers.detection_consumer import DetectionStreamConsumer
 - 
from .utils import create_date_folder, create_time_folder
 - 
from .stream_status import stream_status
 - 
 - 
MEDIA_ROOT = Path(settings.MEDIA_ROOT)
 - 
 - 
def get_csrf_token(request):
 - 
token = get_token(request)
 - 
print(token)
 - 
return JsonResponse({‘csrfToken’: token})
 - 
 - 
def push_stream(request):
 - 
print(“前端返回本地视频,准备推流”)
 - 
video_file = request.FILES.get(‘video’)
 - 
if not video_file:
 - 
return JsonResponse({‘error’: ‘未上传视频文件’}, status=400)
 - 
 - 
# ——————- 创建精确时间文件夹 ——————-
 - 
start_datetime = datetime.now() # 记录精确到秒的开始时间
 - 
 - 
date_str = start_datetime.strftime(‘%Y-%m-%d’)
 - 
date_path = create_date_folder(date_str)
 - 
stream_status.start_time = datetime.now()
 - 
# 创建带时分秒的文件夹(如 15:30:45)
 - 
time_path, time_folder = create_time_folder(date_path, start_datetime)
 - 
 - 
# 创建三类子文件夹
 - 
original_path = os.path.join(time_path, ‘原视频’)
 - 
recognized_path = os.path.join(time_path, ‘识别视频’)
 - 
image_path = os.path.join(time_path, ‘关键帧图片’)
 - 
stream_status.image_path = image_path # 保存到全局状态
 - 
for folder in [original_path, recognized_path, image_path]:
 - 
os.makedirs(folder, exist_ok=True)
 - 
 - 
# ——————- 保存上传视频 ——————-
 - 
original_video_name = f”original_{start_datetime.strftime(‘%H%M%S’)}.mp4″
 - 
original_video_path = os.path.join(original_path, original_video_name)
 - 
with open(original_video_path, ‘wb’) as f:
 - 
for chunk in video_file.chunks():
 - 
f.write(chunk)
 - 
 - 
# ——————- 推流配置 ——————-
 - 
push_url = “rtmp://127.0.0.1:1935/live/stream”
 - 
recognized_video_name = f”recognized_{start_datetime.strftime(‘%H%M%S’)}.mp4″
 - 
recognized_video_path = os.path.join(recognized_path, recognized_video_name)
 - 
 - 
 - 
command = [
 - 
“ffmpeg”,
 - 
“-re”,
 - 
“-i”, original_video_path,
 - 
“-c:v”, “libx264”,
 - 
“-preset”, “ultrafast”,
 - 
“-tune”, “zerolatency”,
 - 
“-g”, “50”,
 - 
“-r”, “30”,
 - 
“-an”,
 - 
“-f”, “flv”,
 - 
push_url,
 - 
“-f”, “mp4”,
 - 
recognized_video_path
 - 
]
 - 
 - 
try:
 - 
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 - 
 - 
# 保存完整时间状态
 - 
stream_status.is_streaming = True
 - 
stream_status.start_datetime = start_datetime # 保存完整 datetime
 - 
stream_status.time_path = time_path
 - 
stream_status.recognized_path = recognized_path
 - 
stream_status.image_path = image_path
 - 
 - 
detection_thread = threading.Thread(
 - 
target=run_detection_in_background,
 - 
daemon=True
 - 
)
 - 
detection_thread.start()
 - 
 - 
return JsonResponse({
 - 
‘status’: ‘success’,
 - 
‘date_folder’: date_str,
 - 
‘time_folder’: time_folder # 格式如 15:30:45
 - 
})
 - 
except Exception as e:
 - 
return JsonResponse({‘error’: str(e)}, status=500)
 - 
 - 
def run_detection_in_background():
 - 
try:
 - 
loop = asyncio.new_event_loop()
 - 
asyncio.set_event_loop(loop)
 - 
loop.run_until_complete(DetectionStreamConsumer().start_detection())
 - 
except Exception as e:
 - 
print(f”后台检测线程异常: {e}“)
 - 
 - 
def get_date_folders(request):
 - 
page = int(request.GET.get(‘page’, 1))
 - 
page_size = int(request.GET.get(‘page_size’, 10))
 - 
MEDIA_ROOT_PATH = Path(r’F:\FullStack\Django\DJI_yolo\media’)
 - 
 - 
date_folders = []
 - 
for name in os.listdir(MEDIA_ROOT_PATH):
 - 
folder_path = MEDIA_ROOT_PATH / name
 - 
if folder_path.is_dir() and len(name.split(‘-‘)) == 3:
 - 
date_folders.append({‘date’: name})
 - 
 - 
date_folders.sort(key=lambda x: x[‘date’], reverse=True)
 - 
start = (page – 1) * page_size
 - 
end = start + page_size
 - 
return JsonResponse({
 - 
‘data’: date_folders[start:end],
 - 
‘total’: len(date_folders)
 - 
})
 - 
 - 
def get_time_folders(request, date):
 - 
date_path = Path(r’F:\FullStack\Django\DJI_yolo\media’) / date
 - 
if not date_path.is_dir():
 - 
return JsonResponse({‘error’: ‘日期文件夹不存在’}, status=404)
 - 
 - 
page = int(request.GET.get(‘page’, 1))
 - 
page_size = int(request.GET.get(‘page_size’, 10))
 - 
time_folders = []
 - 
 - 
for name in os.listdir(date_path):
 - 
time_path = date_path / name
 - 
if time_path.is_dir():
 - 
original_count = len(os.listdir(time_path / ‘原视频’)) if (time_path / ‘原视频’).is_dir() else 0
 - 
recognized_count = len(os.listdir(time_path / ‘识别视频’)) if (time_path / ‘识别视频’).is_dir() else 0
 - 
image_count = len(os.listdir(time_path / ‘关键帧图片’)) if (time_path / ‘关键帧图片’).is_dir() else 0
 - 
 - 
time_folders.append({
 - 
‘time_range’: name, # 直接使用时间区间名称
 - 
‘original_count’: original_count,
 - 
‘recognized_count’: recognized_count,
 - 
‘image_count’: image_count
 - 
})
 - 
 - 
time_folders.sort(key=lambda x: x[‘time_range’], reverse=True)
 - 
start = (page – 1) * page_size
 - 
end = start + page_size
 - 
return JsonResponse({
 - 
‘data’: time_folders[start:end],
 - 
‘total’: len(time_folders)
 - 
})
 - 
 - 
def get_file_count(request, date, time_range, category):
 - 
category_path = Path(r’F:\FullStack\Django\DJI_yolo\media’) / date / time_range / category
 - 
if not category_path.is_dir():
 - 
return JsonResponse({‘count’: 0})
 - 
 - 
count = len(os.listdir(category_path))
 - 
return JsonResponse({‘count’: count})
 - 
 - 
def get_files(request, date, time_range, category):
 - 
try:
 - 
decoded_time_range = urllib.parse.unquote(time_range)
 - 
category_path = Path(r’F:\FullStack\Django\DJI_yolo\media’) / date / decoded_time_range / category
 - 
 - 
if not category_path.is_dir():
 - 
return JsonResponse({‘error’: ‘文件类别不存在’}, status=404)
 - 
 - 
page = int(request.GET.get(‘page’, 1))
 - 
page_size = int(request.GET.get(‘page_size’, 10))
 - 
files = []
 - 
 - 
for filename in os.listdir(category_path):
 - 
file_path = category_path / filename
 - 
if file_path.is_file():
 - 
encoded_time_range = urllib.parse.quote(decoded_time_range)
 - 
files.append({
 - 
‘name’: filename,
 - 
‘url’: f’http://{request.get_host()}/media/{date}/{encoded_time_range}/{category}/{filename}‘
 - 
})
 - 
 - 
start = (page – 1) * page_size
 - 
end = start + page_size
 - 
return JsonResponse({
 - 
‘data’: files[start:end],
 - 
‘total’: len(files),
 - 
‘status’: ‘success’
 - 
})
 - 
except Exception as e:
 - 
print(f”文件列表错误: {str(e)}“)
 - 
return JsonResponse({
 - 
‘error’: ‘服务器内部错误’,
 - 
‘detail’: str(e)
 - 
}, status=500)
 - 
 - 
def delete_file(request, date, time_range, category, filename):
 - 
try:
 - 
decoded_time_range = urllib.parse.unquote(time_range)
 - 
file_path = Path(r’F:\FullStack\Django\DJI_yolo\media’) / date / decoded_time_range / category / filename
 - 
if not file_path.exists():
 - 
return JsonResponse({‘error’: ‘文件不存在’}, status=404)
 - 
 - 
os.remove(file_path)
 - 
return JsonResponse({‘status’: ‘success’})
 - 
except Exception as e:
 - 
return JsonResponse({‘error’: str(e)}, status=500)
 
python
运行
 - 
 
    		
        