本文旨在实现无人机城市交通智慧巡检中的一个模块——无人机视频实时推拉流以及识别流并在前端展示,同时,统计目标数量以及违停数量,生成结果评估,一并发送到前端展示。对于本文任何技术上的空缺,可在博主主页前面博客寻找,有任何问题欢迎私信或评论区讨论!!!
涉及技术栈:
Django5+vue3+websocket+SRS+FFmpeg+RTMP+YOLOv8+AI模型+异步+多进程+flv.js+node.js
web端推拉流测试、抽帧识别计数,一键式生成巡检报告
项目结构(Django):
├── DJI_yolo
│ ├── __init__.py
│ ├── __pycache__
│ ├── asgi.py
│ ├── settings.py
│ ├── templates
│ ├── urls.py
│ └── wsgi.py
├── app01
│ ├── __init__.py
│ ├── __pycache__
│ ├── admin.py
│ ├── apps.py
│ ├── car_best.pt
│ ├── consumers
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ ├── detection_consumer.py
│ │ └── report_consumer.py
│ ├── migrations
│ │ ├── __init__.py
│ │ └── __pycache__
│ ├── models.py
│ ├── pictures
│ ├── routings.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ └── detection_engine.py
│ ├── stream_status.py
│ ├── tests.py
│ ├── utils.py
│ └── views.py
├── manage.py
├── media
│ ├── 2025-06-02
│ │ ├── 20时14分54秒-20时17分03秒
│ │ │ ├── 关键帧图片
│ │ │ ├── 原视频
│ │ │ └── 识别视频
│ │ ├── 20时26分11秒-20时29分00秒
│ │ │ ├── 关键帧图片
│ │ │ ├── 原视频
│ │ │ └── 识别视频
存在的问题,亟需解决:
1、后端逐帧识别并返回前端,前端显示卡顿,考虑跳帧识别,一秒识别一帧,剩余原始帧直接返回。
2、曾尝试过使用 yolo 中的 model 类中的 track 方法进行跨帧追踪,为每帧每个目标进行编号(id),提取帧中目标特征(速度、目标面积、唯位移等等),进行特征化工程,从而判断相邻两识别帧中的某两目标是否为同一目标。由于添加追踪算法,单帧识别时间过长,单帧识别时间大于拉流帧传输时间,导致进程堵塞,程序崩溃。后续采取措施改进。
3、模型并未做违停检测,考虑重新训练模型,添加违停训练集,重新训练,顺便搭配 yolo12 环境,用 yolo12 进行训练。
4、Django 后端接受到流,并不能直接开始识别,而是先用四五秒时间加载模型,需要优化。
5、执行任务、完成任务到数据存储、前端显示巡检报告,延迟较高,需要优化。
6、为后端添加车辆运动状态算法,若连续识别的两个帧同一目标静止,并且识别为违停,则判断为违停;若非静止但识别违停,不做处理。相对静止判断?无人机在动,无法建立参考系。
7、进行车牌识别。
代码及粗略解释:
detection_consumer.py:
此处代码使用 FFmpeg 拉取 SRS 上的流,并使用 detection_engine 中的类进行帧识别和计数,然后判断计数,调用讯飞星火 API 生成 AI 答复,获取关键帧,将所有信息封装成一个字典,并传给前端,以用于违停报告生成。同时,将实时识别帧通过 websocket 的消费者传给前端显示。
-
import asyncio
-
import base64
-
import os
-
import subprocess
-
import time
-
import traceback
-
from datetime import datetime
-
from pathlib import Path
-
from .report_consumer import ReportConsumer
-
import numpy as np
-
import cv2
-
from channels.generic.websocket import AsyncWebsocketConsumer
-
from channels.layers import get_channel_layer
-
from app01.services.detection_engine import DetectionEngine, logger
-
from concurrent.futures import ThreadPoolExecutor
-
from app01.utils import rename_time_folder # 导入重命名函数
-
from app01.stream_status import stream_status, status_lock # 从新模块导入
-
import requests
-
-
RTMP_URL = “rtmp://127.0.0.1:1935/live/stream”
-
channel_layer = get_channel_layer()
-
-
class DetectionStreamConsumer(AsyncWebsocketConsumer):
-
async def connect(self):
-
await self.channel_layer.group_add(“detection_group”, self.channel_name)
-
await self.accept()
-
print(“WebSocket 连接建立成功”)
-
-
async def disconnect(self, code):
-
await self.channel_layer.group_discard(“detection_group”, self.channel_name)
-
print(“WebSocket 连接关闭”)
-
-
async def start_detection(self):
-
print(“开始检测帧流…”)
-
await self.monitor_stream_status()
-
-
async def monitor_stream_status(self):
-
while True:
-
if stream_status.is_streaming:
-
logger.info(“检测到推流,开始处理…”)
-
await self.start_detection_stream()
-
else:
-
logger.info(“当前无推流,等待中…”)
-
await asyncio.sleep(1)
-
-
async def start_detection_stream(self, all_dict=None):
-
ffmpeg_pull_command = [
-
“ffmpeg”,
-
“-c:v”, “h264”,
-
“-i”, RTMP_URL,
-
“-f”, “rawvideo”,
-
“-pix_fmt”, “bgr24”,
-
“-s”, “640×360”,
-
“-r”, “5”,
-
“-vcodec”, “rawvideo”,
-
“-an”,
-
“-flags”, “+low_delay”,
-
“-tune”, “zerolatency”,
-
“-“
-
]
-
-
try:
-
process = subprocess.Popen(
-
ffmpeg_pull_command,
-
stdout=subprocess.PIPE,
-
stderr=subprocess.DEVNULL,
-
bufsize=10 * 8
-
)
-
except Exception as e:
-
logger.error(f”FFmpeg 子进程启动失败: {e}“)
-
return
-
-
if process.stdout is None:
-
logger.error(“FFmpeg stdout 为空”)
-
return
-
-
frame_width, frame_height = 640, 360
-
frame_size = frame_width * frame_height * 3
-
executor = ThreadPoolExecutor(max_workers=4) # 最多可以同时运行 4 个线程
-
engine = DetectionEngine()
-
frame_count = 0
-
skip_interval = 1
-
-
try:
-
while stream_status.is_streaming:
-
try:
-
loop = asyncio.get_event_loop()
-
raw_frame = await asyncio.wait_for(
-
loop.run_in_executor(executor, process.stdout.read, frame_size),
-
timeout=8 # 超时检测断流
-
)
-
except asyncio.TimeoutError:
-
logger.warning(“读取帧超时,触发流结束”)
-
with status_lock:
-
stream_status.is_streaming = False
-
break
-
-
if len(raw_frame) != frame_size:
-
continue
-
-
try:
-
frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((frame_height, frame_width, 3))
-
except Exception as e:
-
logger.warning(f”帧解析失败: {e}“)
-
continue
-
-
frame_count += 1
-
if frame_count % skip_interval != 0:
-
continue
-
-
result = await loop.run_in_executor(executor, engine.process_frame, frame)
-
# 增加空值检查
-
if result is None:
-
logger.warning(“检测结果为空,跳过处理”)
-
continue
-
processed_frame, detected_classes = result
-
-
# 更新车辆统计(仅按类别)
-
with DetectionEngine.counter_lock:
-
for class_id in detected_classes:
-
if class_id == 0:
-
DetectionEngine.total_count[‘car’] += 1
-
DetectionEngine.total_count[‘total’] += 1
-
elif class_id == 1:
-
DetectionEngine.total_count[‘bus’] += 1
-
DetectionEngine.total_count[‘total’] += 1
-
elif class_id == 2:
-
DetectionEngine.total_count[‘truck’] += 1
-
DetectionEngine.total_count[‘total’] += 1
-
elif class_id == 3:
-
DetectionEngine.total_count[‘van’] += 1
-
DetectionEngine.total_count[‘total’] += 1
-
-
_, jpeg = cv2.imencode(‘.jpg’, processed_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50])
-
if channel_layer:
-
await channel_layer.group_send(
-
“detection_group”,
-
{“type”: “send_frame”, “frame”: jpeg.tobytes()}
-
)
-
except Exception as e:
-
logger.error(f”检测处理错误: {e}“)
-
logger.error(traceback.format_exc())
-
finally:
-
logger.warning(“流结束,进入处理…”)
-
if process.returncode is None:
-
process.kill()
-
logger.warning(“FFmpeg 进程已终止”)
-
-
# 延迟一小段时间,确保文件操作完全释放
-
time.sleep(1)
-
-
stream_status.is_streaming = False
-
stream_status.end_datetime = datetime.now() # 记录结束时间
-
-
logger.warning(“正在获取三个关键帧…”)
-
# 找3个关键帧,必须在重命名前执行
-
folder = Path(f”{stream_status.image_path}“)
-
files = [f for f in folder.iterdir() if f.is_file()]
-
if not files:
-
return None, None, None
-
# 按修改时间排序
-
files.sort(key=lambda x: x.stat().st_mtime)
-
first = files[0]
-
last = files[-1]
-
middle = files[len(files) // 2]
-
-
# 转换为Base64
-
stream_status.image_li = [
-
self.image_to_base64(str(f)) # 传入字符串路径,避免Path对象序列化问题
-
for f in [first, middle, last]
-
]
-
-
logger.warning(“正在重命名文件夹…”)
-
# 先复制 time_path,因为后续可能被修改
-
time_path = stream_status.time_path
-
start_datetime = stream_status.start_datetime
-
end_datetime = stream_status.end_datetime
-
save_path = “”
-
# 调用重命名函数(使用完整 datetime)
-
if time_path and start_datetime:
-
-
path = rename_time_folder(
-
time_path,
-
start_datetime, # 传入开始时间
-
end_datetime # 传入结束时间
-
)
-
save_path = path
-
logger.warning(
-
f”文件夹已重命名为 {start_datetime.strftime(‘%H时%M分%S秒’)}–{end_datetime.strftime(‘%H时%M分%S秒’)}“)
-
-
try:
-
logger.warning(“正在更新数据…”)
-
# 更新识别结果
-
stats = self.correct_overcounted_stats(DetectionEngine.total_count)
-
print(“stats:”, stats)
-
stream_status.total_stats = stats
-
print(“stream_status.total_stats:”, stream_status.total_stats)
-
self.save_vehicle_stats(stats, save_path)
-
-
except Exception as e:
-
logger.error(f”保存统计信息失败: {e}“)
-
print(“正在获取AI答复…”)
-
self.AI_response()
-
-
# 定义总数据,并序列化
-
all_dict = {
-
‘datetime’: stream_status.start_time.strftime(“%Y-%m-%d %H:%M:%S.%f”), # 序列化,转为字符串
-
‘image_li’: [bs for bs in stream_status.image_li], # 关键帧转二进制
-
‘detect_car’: [_ for _ in stream_status.total_stats.values()],
-
‘Al_response’: stream_status.AI_talk,
-
}
-
-
print(“all_dict:”, all_dict)
-
print(“正在发送信息…”)
-
await ReportConsumer.send_report_data(all_dict)
-
-
executor.shutdown(wait=False)
-
DetectionEngine.reset_stats()
-
stream_status.reset_all_dict()
-
-
def save_vehicle_stats(self, stats, save_path=None):
-
# 如果未提供路径,使用默认的 time_path
-
if not save_path:
-
save_path = stream_status.time_path
-
-
if not save_path:
-
logger.warning(“无法保存统计信息:路径为空”)
-
return
-
-
stats_path = os.path.join(save_path, ‘vehicle_stats.txt’)
-
try:
-
with open(stats_path, ‘w’, encoding=‘utf-8-sig’) as f:
-
f.write(“车辆统计结果\n”)
-
f.write(“————————-\n”)
-
for k, v in stats.items():
-
f.write(f”{k}: {v}\n”)
-
f.write(“————————-\n”)
-
logger.info(f”统计信息已保存至 {stats_path}“)
-
except Exception as e:
-
logger.error(f”写入统计信息失败: {e}“)
-
-
-
def AI_response(self):
-
“””获取AI回复”””
-
url = “https://spark-api-open.xf-yun.com/v1/chat/completions”
-
headers = {
-
“Authorization”: “Bearer pDLDvpUbFDTEQZACQSjD:CqZewebAdgpuvxrVbbAv”,
-
“Content-Type”: “application/json”,
-
}
-
content = (
-
f’我现在要做一个无人机城市交通智慧巡检报告,现在巡检结果是:巡检时间:{stream_status.start_time}‘
-
f’小汽车有{stream_status.total_stats[“car”]}辆,面包车有{stream_status.total_stats[“van”]}辆,’
-
f’公交车有{stream_status.total_stats[“bus”]}辆,卡车有{stream_status.total_stats[“truck”]}辆,’
-
f’识别到的违停车辆共有{stream_status.total_stats[“illegally_car”]}辆。’
-
f’帮我生成一段结论,要求不要有废话,也不要写具体那四个类别识别到的车辆数,字数200字,语言精炼严谨’
-
)
-
-
question = {
-
“role”: “user”,
-
“content”: content
-
}
-
-
data = {
-
“max_tokens”: 4096,
-
“top_k”: 4,
-
“messages”: [question],
-
“temperature”: 0.5,
-
“model”: “4.0Ultra”,
-
“stream”: False,
-
}
-
response = requests.post(url, headers=headers, json=data)
-
response.raise_for_status()
-
result = response.json()
-
-
# 提取模型输出的内容
-
reply = result.get(‘choices’, [{}])[0].get(‘message’, {}).get(‘content’, ”).strip()
-
print(reply)
-
stream_status.AI_talk = reply
-
-
# 读取图片文件并转换为 Base64
-
# 静态方法不应接收self参数
-
def image_to_base64(image_path):
-
with open(image_path, “rb”) as img_file:
-
base64_str = base64.b64encode(img_file.read()).decode(“utf-8”)
-
return f”data:image/jpeg;base64,{base64_str}“
-
-
# 去掉self第一个参数
-
def correct_overcounted_stats(stats, avg_appearance=6):
-
“””
-
简单修正严重重复统计的结果
-
“””
-
corrected = {}
-
for cls in [‘car’, ‘van’, ‘bus’, ‘truck’]:
-
if cls in stats:
-
corrected[cls] = max(0, int(round(stats[cls] / avg_appearance)))
-
-
# 可选:重新计算 total
-
corrected[‘total’] = sum(corrected.values())
-
corrected[‘illegally_car’] = int(corrected[‘total’] / 100)
-
print(corrected)
-
-
return corrected
-
-
async def send_frame(self, event):
-
frame_bytes = event[“frame”]
-
await self.send(bytes_data=frame_bytes)
stream_status.py:
本文件用于 Django 项目全局状态管理,相当于 vue3 中的 pinia , is_streaming 是前端推本地视频、后端上传 SRS 后,变为 True ,后端因为 is_streaming = True 而开始拉流,当超过 6 秒未拉到流,判断流结束,is_streaming = False。还定义了全局状态锁、状态重置方法、时间获取方法、文件夹名获取方法、状态实例。全局状态锁是为了防止多线程修改同一个共享变量,只有with status_lock:在 with 块内,只有当前线程持有锁时才能运行代码,其他线程必须等待,相当于在此处把锁”锁“起来了。
-
import threading
-
-
# 全局状态锁和状态对象
-
status_lock = threading.Lock()
-
class StreamStatus:
-
-
def __init__(self):
-
self.is_streaming = False # 是否正在推流/检测
-
self.start_time = None # 推流开始时间(datetime对象)
-
self.end_time = None # 推流结束时间(datetime对象)
-
self.time_path = None # 当前时间文件夹路径(如 Media/2025-06-01/12点)
-
self.total_stats = { # 车辆统计
-
“car”: 0,
-
“van”: 0,
-
“bus”: 0,
-
“truck”: 0,
-
“total”: 0,
-
“illegally_car”: 0,
-
}
-
self.last_frame_time = None # 最后收到有效帧的时间(用于断流检测)
-
self.image_path = None
-
self.image_li = []
-
self.AI_talk = None
-
self.all_dict = {}
-
-
-
def date_folder(self):
-
“””获取日期文件夹名称(如 2025-06-01)”””
-
if self.start_time:
-
return self.start_time.strftime(“%Y-%m-%d”)
-
return None
-
-
-
def start_hour(self):
-
“””获取开始小时(如 12)”””
-
if self.start_time:
-
return self.start_time.hour
-
return None
-
-
def reset(self):
-
“””重置状态(用于新任务开始)”””
-
self.is_streaming = False
-
self.start_time = None
-
self.end_time = None
-
self.time_path = None
-
self.total_stats = {k: 0 for k in self.total_stats}
-
self.last_frame_time = None
-
-
def reset_all_dict(self):
-
self.last_frame_time = None # 最后收到有效帧的时间(用于断流检测)
-
self.image_path = None
-
self.image_li = []
-
self.AI_talk = None
-
self.all_dict = {}
-
self.all_dict = {}
-
# 全局唯一的状态实例
-
stream_status = StreamStatus()
python
detection_engine.py:
此处是 detection_consumer.py 使用的服务代码块,用于为其提供帧识别服务,consumer 拉到的每一个流都会调用该类中的 process_frame 类方法,并同时进行计数和关键帧保存
-
import threading
-
import time
-
import cv2
-
from ultralytics import YOLO
-
import logging
-
import os
-
from app01.stream_status import stream_status, status_lock # 从新模块导入
-
-
logger = logging.getLogger(__name__)
-
-
MODEL_PATH = “F:\\FullStack\\Django\\DJI_yolo\\app01\\car_best.pt”
-
model = YOLO(MODEL_PATH)
-
-
# 关键帧存储位置
-
pictures_dir = “pictures”
-
if not os.path.exists(pictures_dir):
-
os.makedirs(pictures_dir)
-
-
-
class DetectionEngine:
-
frame_counter = 1
-
counter_lock = threading.Lock() # 线程锁,保证计数器安全
-
-
# 新增:全局计数器和已统计 ID
-
total_count = {
-
‘car’: 0,
-
‘van’: 0,
-
‘bus’: 0,
-
‘truck’: 0,
-
“total”: 0,
-
“illegally_car”: 0
-
}
-
# 下次任务重置
-
-
def reset_stats(cls):
-
with cls.counter_lock:
-
for k in cls.total_count:
-
cls.total_count[k] = 0
-
-
# 下次任务重置
-
-
def update_stats(cls, vehicle_type):
-
with cls.counter_lock:
-
if vehicle_type in cls.total_count:
-
cls.total_count[vehicle_type] += 1
-
cls.total_count[“total”] += 1
-
-
def __init__(self):
-
self.model = model
-
# 调整检测参数,减少计算量
-
self.detect_params = {
-
‘conf’: 0.3, # 提高置信度阈值,减少无效检测
-
‘iou’: 0.5, # 调整 NMS 阈值
-
‘imgsz’: [384, 640], # 输入尺寸,降低分辨率
-
‘classes’: [0, 1, 2, 3], # 仅检测车辆类别(2: car, 5: bus, 7: truck)
-
‘device’: 0, # 使用 GPU
-
‘half’: True, # 使用 FP16 精度
-
‘show’: False # 关闭实时显示,减少开销
-
}
-
-
def process_frame(self, frame):
-
“””处理单帧图像:YOLO推理 + 保存结果到关键帧图片文件夹”””
-
results = model(frame)
-
detected_objects = []
-
annotated_frame = frame.copy() # 复制原始帧以防止修改原图
-
# 防御性检查:确保 results 非空且包含有效数据
-
if not results or len(results) == 0:
-
logger.warning(“未获取到检测结果,跳过处理”)
-
return frame, []
-
-
boxes = results[0].boxes
-
if boxes is None or len(boxes) == 0:
-
logger.warning(“检测结果为空,跳过处理”)
-
return frame, []
-
try:
-
# 假设类别信息存储在 cls 属性中
-
classes = boxes.cls.int().cpu().tolist() if hasattr(boxes, ‘cls’) and boxes.cls is not None else []
-
-
for class_id in classes:
-
detected_objects.append(class_id)
-
-
annotated_frame = results[0].plot() # 绘制检测框
-
except Exception as e:
-
logger.error(f”解析检测数据失败: {e}“)
-
return annotated_frame, []
-
with status_lock:
-
car_count = sum(1 for cls in detected_objects if cls == 0)
-
if car_count > 18:
-
try:
-
timestamp = int(time.time())
-
save_path = os.path.join(stream_status.image_path, f”keyframe_{timestamp}.jpg”)
-
cv2.imwrite(save_path, annotated_frame)
-
logger.info(f”关键帧保存成功: {save_path}“)
-
except Exception as e:
-
logger.error(f”保存关键帧失败: {e}“)
-
-
return annotated_frame, detected_objects
repoet_consumer.py:
用于将前面 detection_consumer.py 中的字典数据发送给前端,首先会将数据缓存到后端,等到前端消费者建立成功,立马发送出去。
-
import asyncio
-
-
from channels.generic.websocket import AsyncWebsocketConsumer
-
from channels.layers import get_channel_layer
-
import json
-
-
# 缓存队列:用于保存尚未发送的数据
-
queue = []
-
# 记录是否有消费者连接
-
has_consumer = False
-
-
class ReportConsumer(AsyncWebsocketConsumer):
-
async def connect(self):
-
global has_consumer
-
# 加入指定组
-
await self.channel_layer.group_add(“report_group”, self.channel_name)
-
await self.accept()
-
print(“巡检报告WebSocket 已连接”)
-
# 设置有消费者连接的标志
-
has_consumer = True
-
# 尝试发送缓存数据
-
await flush_queue(self.channel_layer)
-
-
async def disconnect(self, close_code):
-
global has_consumer
-
# 移除组成员
-
await self.channel_layer.group_discard(“report_group”, self.channel_name)
-
print(“巡检报告WebSocket 断开连接”)
-
# 重置消费者连接标志
-
has_consumer = False
-
-
async def receive(self, text_data=None, bytes_data=None):
-
pass
-
-
async def send_report(self, event):
-
“””处理 send_report 类型的消息并发送给客户端”””
-
data = event[‘data’]
-
print(“【发送到客户端】”, data)
-
-
# 转换为JSON字符串发送
-
try:
-
# 处理可能包含的非JSON可序列化对象
-
# 这里假设 image_li 包含 Path 对象,需要转换为字符串
-
if ‘image_li’ in data:
-
data[‘image_li’] = [str(path) for path in data[‘image_li’]]
-
-
await self.send(text_data=json.dumps(data))
-
except Exception as e:
-
print(f”【发送到客户端失败】{e}“)
-
-
-
async def send_report_data(cls, data):
-
“””供其他模块调用的方法”””
-
global queue, has_consumer
-
# 不再检查是否有人连接,直接发送或缓存,生产模式用Redis
-
channel_layer = get_channel_layer()
-
-
# 如果没有消费者连接,将数据加入队列
-
if not has_consumer:
-
queue.append(data)
-
print(f”【数据已缓存】等待消费者连接: {data}“)
-
else:
-
# 有消费者连接,直接发送
-
await send_report(channel_layer, data)
-
-
-
# ========== 全局函数 ==========
-
# 定期检查并发送缓存数据的任务
-
async def queue_check_task():
-
channel_layer = get_channel_layer()
-
while True:
-
await asyncio.sleep(1) # 每秒检查一次
-
if queue and has_consumer:
-
await flush_queue(channel_layer)
-
# 在应用启动时启动队列检查任务
-
async def start_queue_check():
-
asyncio.create_task(queue_check_task())
-
async def send_report(channel_layer, data):
-
try:
-
await channel_layer.group_send(
-
“report_group”,
-
{
-
“type”: “send_report”,
-
“data”: data,
-
},
-
)
-
print(“【发送成功】”, data)
-
except Exception as e:
-
print(f”【发送失败】{e}“)
-
-
-
async def flush_queue(channel_layer):
-
global queue
-
if not queue:
-
return
-
-
items = list(queue)
-
queue.clear()
-
for data in items:
-
await send_report(channel_layer, data)
routings.py:
用于定义 websocket 的后端接收路径,并连接消费者
-
from django.urls import re_path
-
from app01.consumers import detection_consumer, report_consumer
-
-
websocket_urlpatterns = [
-
re_path(r’^ws/detection/$’, detection_consumer.DetectionStreamConsumer.as_asgi()),
-
re_path(r’^ws/report/$’, report_consumer.ReportConsumer.as_asgi()),
-
]
urls.py:
用于接收前端发送的 http 请求,并与 view.py 中的视图函数建立联系
-
“””
-
URL configuration for DJI_yolo project.
-
-
The `urlpatterns` list routes URLs to views. For more information please see:
-
https://docs.djangoproject.com/en/5.1/topics/http/urls/
-
Examples:
-
Function views
-
1. Add an import: from my_app import views
-
2. Add a URL to urlpatterns: path(”, views.home, name=’home’)
-
Class-based views
-
1. Add an import: from other_app.views import Home
-
2. Add a URL to urlpatterns: path(”, Home.as_view(), name=’home’)
-
Including another URLconf
-
1. Import the include() function: from django.urls import include, path
-
2. Add a URL to urlpatqterns: path(‘blog/’, include(‘blog.urls’))
-
“””
-
from django.conf.urls.static import static
-
from django.contrib import admin
-
from django.urls import path
-
-
from DJI_yolo import settings
-
from app01 import views
-
urlpatterns = [
-
path(“admin/”, admin.site.urls),
-
path(‘csrf/’, views.get_csrf_token, name=‘get_csrf_token’),
-
# 推流测试
-
path(‘push_stream/’, views.push_stream, name=‘push_stream’),
-
-
# 媒体库管理
-
path(‘api4/date-folders/’, views.get_date_folders, name=‘date-folders’),
-
path(‘api4/time-folders/<str:date>/’, views.get_time_folders, name=‘time-folders’),
-
path(‘api4/files/<str:date>/<str:time_range>/<str:category>/count/’, views.get_file_count, name=‘file-count’),
-
path(‘api4/files/<str:date>/<str:time_range>/<str:category>/’, views.get_files, name=‘files’),
-
path(‘api4/delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>/’, views.delete_file,
-
name=‘delete-file’),
-
path(‘delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>’, views.delete_file,
-
name=‘delete-file’), # 添加一个用于删除文件的 URL 路径
-
-
] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
views.py:
用于处理前端请求,这里报告推流、媒体管理系统、原始视频存储逻辑等等。
-
# views.py
-
from datetime import datetime
-
from pathlib import Path
-
import urllib.parse
-
from django.middleware.csrf import get_token
-
import os
-
import subprocess
-
import threading
-
from django.http import JsonResponse
-
from django.conf import settings
-
import asyncio
-
from .consumers.detection_consumer import DetectionStreamConsumer
-
from .utils import create_date_folder, create_time_folder
-
from .stream_status import stream_status
-
-
MEDIA_ROOT = Path(settings.MEDIA_ROOT)
-
-
def get_csrf_token(request):
-
token = get_token(request)
-
print(token)
-
return JsonResponse({‘csrfToken’: token})
-
-
def push_stream(request):
-
print(“前端返回本地视频,准备推流”)
-
video_file = request.FILES.get(‘video’)
-
if not video_file:
-
return JsonResponse({‘error’: ‘未上传视频文件’}, status=400)
-
-
# ——————- 创建精确时间文件夹 ——————-
-
start_datetime = datetime.now() # 记录精确到秒的开始时间
-
-
date_str = start_datetime.strftime(‘%Y-%m-%d’)
-
date_path = create_date_folder(date_str)
-
stream_status.start_time = datetime.now()
-
# 创建带时分秒的文件夹(如 15:30:45)
-
time_path, time_folder = create_time_folder(date_path, start_datetime)
-
-
# 创建三类子文件夹
-
original_path = os.path.join(time_path, ‘原视频’)
-
recognized_path = os.path.join(time_path, ‘识别视频’)
-
image_path = os.path.join(time_path, ‘关键帧图片’)
-
stream_status.image_path = image_path # 保存到全局状态
-
for folder in [original_path, recognized_path, image_path]:
-
os.makedirs(folder, exist_ok=True)
-
-
# ——————- 保存上传视频 ——————-
-
original_video_name = f”original_{start_datetime.strftime(‘%H%M%S’)}.mp4″
-
original_video_path = os.path.join(original_path, original_video_name)
-
with open(original_video_path, ‘wb’) as f:
-
for chunk in video_file.chunks():
-
f.write(chunk)
-
-
# ——————- 推流配置 ——————-
-
push_url = “rtmp://127.0.0.1:1935/live/stream”
-
recognized_video_name = f”recognized_{start_datetime.strftime(‘%H%M%S’)}.mp4″
-
recognized_video_path = os.path.join(recognized_path, recognized_video_name)
-
-
-
command = [
-
“ffmpeg”,
-
“-re”,
-
“-i”, original_video_path,
-
“-c:v”, “libx264”,
-
“-preset”, “ultrafast”,
-
“-tune”, “zerolatency”,
-
“-g”, “50”,
-
“-r”, “30”,
-
“-an”,
-
“-f”, “flv”,
-
push_url,
-
“-f”, “mp4”,
-
recognized_video_path
-
]
-
-
try:
-
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
-
-
# 保存完整时间状态
-
stream_status.is_streaming = True
-
stream_status.start_datetime = start_datetime # 保存完整 datetime
-
stream_status.time_path = time_path
-
stream_status.recognized_path = recognized_path
-
stream_status.image_path = image_path
-
-
detection_thread = threading.Thread(
-
target=run_detection_in_background,
-
daemon=True
-
)
-
detection_thread.start()
-
-
return JsonResponse({
-
‘status’: ‘success’,
-
‘date_folder’: date_str,
-
‘time_folder’: time_folder # 格式如 15:30:45
-
})
-
except Exception as e:
-
return JsonResponse({‘error’: str(e)}, status=500)
-
-
def run_detection_in_background():
-
try:
-
loop = asyncio.new_event_loop()
-
asyncio.set_event_loop(loop)
-
loop.run_until_complete(DetectionStreamConsumer().start_detection())
-
except Exception as e:
-
print(f”后台检测线程异常: {e}“)
-
-
def get_date_folders(request):
-
page = int(request.GET.get(‘page’, 1))
-
page_size = int(request.GET.get(‘page_size’, 10))
-
MEDIA_ROOT_PATH = Path(r’F:\FullStack\Django\DJI_yolo\media’)
-
-
date_folders = []
-
for name in os.listdir(MEDIA_ROOT_PATH):
-
folder_path = MEDIA_ROOT_PATH / name
-
if folder_path.is_dir() and len(name.split(‘-‘)) == 3:
-
date_folders.append({‘date’: name})
-
-
date_folders.sort(key=lambda x: x[‘date’], reverse=True)
-
start = (page – 1) * page_size
-
end = start + page_size
-
return JsonResponse({
-
‘data’: date_folders[start:end],
-
‘total’: len(date_folders)
-
})
-
-
def get_time_folders(request, date):
-
date_path = Path(r’F:\FullStack\Django\DJI_yolo\media’) / date
-
if not date_path.is_dir():
-
return JsonResponse({‘error’: ‘日期文件夹不存在’}, status=404)
-
-
page = int(request.GET.get(‘page’, 1))
-
page_size = int(request.GET.get(‘page_size’, 10))
-
time_folders = []
-
-
for name in os.listdir(date_path):
-
time_path = date_path / name
-
if time_path.is_dir():
-
original_count = len(os.listdir(time_path / ‘原视频’)) if (time_path / ‘原视频’).is_dir() else 0
-
recognized_count = len(os.listdir(time_path / ‘识别视频’)) if (time_path / ‘识别视频’).is_dir() else 0
-
image_count = len(os.listdir(time_path / ‘关键帧图片’)) if (time_path / ‘关键帧图片’).is_dir() else 0
-
-
time_folders.append({
-
‘time_range’: name, # 直接使用时间区间名称
-
‘original_count’: original_count,
-
‘recognized_count’: recognized_count,
-
‘image_count’: image_count
-
})
-
-
time_folders.sort(key=lambda x: x[‘time_range’], reverse=True)
-
start = (page – 1) * page_size
-
end = start + page_size
-
return JsonResponse({
-
‘data’: time_folders[start:end],
-
‘total’: len(time_folders)
-
})
-
-
def get_file_count(request, date, time_range, category):
-
category_path = Path(r’F:\FullStack\Django\DJI_yolo\media’) / date / time_range / category
-
if not category_path.is_dir():
-
return JsonResponse({‘count’: 0})
-
-
count = len(os.listdir(category_path))
-
return JsonResponse({‘count’: count})
-
-
def get_files(request, date, time_range, category):
-
try:
-
decoded_time_range = urllib.parse.unquote(time_range)
-
category_path = Path(r’F:\FullStack\Django\DJI_yolo\media’) / date / decoded_time_range / category
-
-
if not category_path.is_dir():
-
return JsonResponse({‘error’: ‘文件类别不存在’}, status=404)
-
-
page = int(request.GET.get(‘page’, 1))
-
page_size = int(request.GET.get(‘page_size’, 10))
-
files = []
-
-
for filename in os.listdir(category_path):
-
file_path = category_path / filename
-
if file_path.is_file():
-
encoded_time_range = urllib.parse.quote(decoded_time_range)
-
files.append({
-
‘name’: filename,
-
‘url’: f’http://{request.get_host()}/media/{date}/{encoded_time_range}/{category}/{filename}‘
-
})
-
-
start = (page – 1) * page_size
-
end = start + page_size
-
return JsonResponse({
-
‘data’: files[start:end],
-
‘total’: len(files),
-
‘status’: ‘success’
-
})
-
except Exception as e:
-
print(f”文件列表错误: {str(e)}“)
-
return JsonResponse({
-
‘error’: ‘服务器内部错误’,
-
‘detail’: str(e)
-
}, status=500)
-
-
def delete_file(request, date, time_range, category, filename):
-
try:
-
decoded_time_range = urllib.parse.unquote(time_range)
-
file_path = Path(r’F:\FullStack\Django\DJI_yolo\media’) / date / decoded_time_range / category / filename
-
if not file_path.exists():
-
return JsonResponse({‘error’: ‘文件不存在’}, status=404)
-
-
os.remove(file_path)
-
return JsonResponse({‘status’: ‘success’})
-
except Exception as e:
-
return JsonResponse({‘error’: str(e)}, status=500)
python
运行
-