web端rtmp推拉流测试、抽帧识别计数,一键式生成巡检报告

本文旨在实现无人机城市交通智慧巡检中的一个模块——无人机视频实时推拉流以及识别流并在前端展示,同时,统计目标数量以及违停数量,生成结果评估,一并发送到前端展示。对于本文任何技术上的空缺,可在博主主页前面博客寻找,有任何问题欢迎私信或评论区讨论!!!

涉及技术栈:

Django5+vue3+websocket+SRS+FFmpeg+RTMP+YOLOv8+AI模型+异步+多进程+flv.js+node.js

web端推拉流测试、抽帧识别计数,一键式生成巡检报告

项目结构(Django):
├── DJI_yolo
│   ├── __init__.py
│   ├── __pycache__
│   ├── asgi.py
│   ├── settings.py
│   ├── templates
│   ├── urls.py
│   └── wsgi.py
├── app01
│   ├── __init__.py
│   ├── __pycache__
│   ├── admin.py
│   ├── apps.py
│   ├── car_best.pt
│   ├── consumers
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   ├── detection_consumer.py
│   │   └── report_consumer.py
│   ├── migrations
│   │   ├── __init__.py
│   │   └── __pycache__
│   ├── models.py
│   ├── pictures
│   ├── routings.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   └── detection_engine.py
│   ├── stream_status.py
│   ├── tests.py
│   ├── utils.py
│   └── views.py
├── manage.py
├── media
│   ├── 2025-06-02
│   │   ├── 20时14分54秒-20时17分03秒
│   │   │   ├── 关键帧图片
│   │   │   ├── 原视频
│   │   │   └── 识别视频
│   │   ├── 20时26分11秒-20时29分00秒
│   │   │   ├── 关键帧图片
│   │   │   ├── 原视频
│   │   │   └── 识别视频

存在的问题,亟需解决:

1、后端逐帧识别并返回前端,前端显示卡顿,考虑跳帧识别,一秒识别一帧,剩余原始帧直接返回。

2、曾尝试过使用 yolo 中的 model 类中的 track 方法进行跨帧追踪,为每帧每个目标进行编号(id),提取帧中目标特征(速度、目标面积、唯位移等等),进行特征化工程,从而判断相邻两识别帧中的某两目标是否为同一目标。由于添加追踪算法,单帧识别时间过长,单帧识别时间大于拉流帧传输时间,导致进程堵塞,程序崩溃。后续采取措施改进。

3、模型并未做违停检测,考虑重新训练模型,添加违停训练集,重新训练,顺便搭配 yolo12 环境,用 yolo12 进行训练。

4、Django 后端接受到流,并不能直接开始识别,而是先用四五秒时间加载模型,需要优化。

5、执行任务、完成任务到数据存储、前端显示巡检报告,延迟较高,需要优化。

6、为后端添加车辆运动状态算法,若连续识别的两个帧同一目标静止,并且识别为违停,则判断为违停;若非静止但识别违停,不做处理。相对静止判断?无人机在动,无法建立参考系。

7、进行车牌识别。

 

代码及粗略解释:

detection_consumer.py:

此处代码使用 FFmpeg 拉取 SRS 上的流,并使用 detection_engine 中的类进行帧识别和计数,然后判断计数,调用讯飞星火 API 生成 AI 答复,获取关键帧,将所有信息封装成一个字典,并传给前端,以用于违停报告生成。同时,将实时识别帧通过 websocket 的消费者传给前端显示。

  1. import asyncio
  2. import base64
  3. import os
  4. import subprocess
  5. import time
  6. import traceback
  7. from datetime import datetime
  8. from pathlib import Path
  9. from .report_consumer import ReportConsumer
  10. import numpy as np
  11. import cv2
  12. from channels.generic.websocket import AsyncWebsocketConsumer
  13. from channels.layers import get_channel_layer
  14. from app01.services.detection_engine import DetectionEngine, logger
  15. from concurrent.futures import ThreadPoolExecutor
  16. from app01.utils import rename_time_folder # 导入重命名函数
  17. from app01.stream_status import stream_status, status_lock # 从新模块导入
  18. import requests
  19. RTMP_URL = “rtmp://127.0.0.1:1935/live/stream”
  20. channel_layer = get_channel_layer()
  21. class DetectionStreamConsumer(AsyncWebsocketConsumer):
  22. async def connect(self):
  23. await self.channel_layer.group_add(“detection_group”, self.channel_name)
  24. await self.accept()
  25. print(“WebSocket 连接建立成功”)
  26. async def disconnect(self, code):
  27. await self.channel_layer.group_discard(“detection_group”, self.channel_name)
  28. print(“WebSocket 连接关闭”)
  29. async def start_detection(self):
  30. print(“开始检测帧流…”)
  31. await self.monitor_stream_status()
  32. async def monitor_stream_status(self):
  33. while True:
  34. if stream_status.is_streaming:
  35. logger.info(“检测到推流,开始处理…”)
  36. await self.start_detection_stream()
  37. else:
  38. logger.info(“当前无推流,等待中…”)
  39. await asyncio.sleep(1)
  40. async def start_detection_stream(self, all_dict=None):
  41. ffmpeg_pull_command = [
  42. “ffmpeg”,
  43. “-c:v”, “h264”,
  44. “-i”, RTMP_URL,
  45. “-f”, “rawvideo”,
  46. “-pix_fmt”, “bgr24”,
  47. “-s”, “640×360”,
  48. “-r”, “5”,
  49. “-vcodec”, “rawvideo”,
  50. “-an”,
  51. “-flags”, “+low_delay”,
  52. “-tune”, “zerolatency”,
  53. “-“
  54. ]
  55. try:
  56. process = subprocess.Popen(
  57. ffmpeg_pull_command,
  58. stdout=subprocess.PIPE,
  59. stderr=subprocess.DEVNULL,
  60. bufsize=10 * 8
  61. )
  62. except Exception as e:
  63. logger.error(f”FFmpeg 子进程启动失败: {e})
  64. return
  65. if process.stdout is None:
  66. logger.error(“FFmpeg stdout 为空”)
  67. return
  68. frame_width, frame_height = 640, 360
  69. frame_size = frame_width * frame_height * 3
  70. executor = ThreadPoolExecutor(max_workers=4) # 最多可以同时运行 4 个线程
  71. engine = DetectionEngine()
  72. frame_count = 0
  73. skip_interval = 1
  74. try:
  75. while stream_status.is_streaming:
  76. try:
  77. loop = asyncio.get_event_loop()
  78. raw_frame = await asyncio.wait_for(
  79. loop.run_in_executor(executor, process.stdout.read, frame_size),
  80. timeout=8 # 超时检测断流
  81. )
  82. except asyncio.TimeoutError:
  83. logger.warning(“读取帧超时,触发流结束”)
  84. with status_lock:
  85. stream_status.is_streaming = False
  86. break
  87. if len(raw_frame) != frame_size:
  88. continue
  89. try:
  90. frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((frame_height, frame_width, 3))
  91. except Exception as e:
  92. logger.warning(f”帧解析失败: {e})
  93. continue
  94. frame_count += 1
  95. if frame_count % skip_interval != 0:
  96. continue
  97. result = await loop.run_in_executor(executor, engine.process_frame, frame)
  98. # 增加空值检查
  99. if result is None:
  100. logger.warning(“检测结果为空,跳过处理”)
  101. continue
  102. processed_frame, detected_classes = result
  103. # 更新车辆统计(仅按类别)
  104. with DetectionEngine.counter_lock:
  105. for class_id in detected_classes:
  106. if class_id == 0:
  107. DetectionEngine.total_count[‘car’] += 1
  108. DetectionEngine.total_count[‘total’] += 1
  109. elif class_id == 1:
  110. DetectionEngine.total_count[‘bus’] += 1
  111. DetectionEngine.total_count[‘total’] += 1
  112. elif class_id == 2:
  113. DetectionEngine.total_count[‘truck’] += 1
  114. DetectionEngine.total_count[‘total’] += 1
  115. elif class_id == 3:
  116. DetectionEngine.total_count[‘van’] += 1
  117. DetectionEngine.total_count[‘total’] += 1
  118. _, jpeg = cv2.imencode(‘.jpg’, processed_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50])
  119. if channel_layer:
  120. await channel_layer.group_send(
  121. “detection_group”,
  122. {“type”: “send_frame”, “frame”: jpeg.tobytes()}
  123. )
  124. except Exception as e:
  125. logger.error(f”检测处理错误: {e})
  126. logger.error(traceback.format_exc())
  127. finally:
  128. logger.warning(“流结束,进入处理…”)
  129. if process.returncode is None:
  130. process.kill()
  131. logger.warning(“FFmpeg 进程已终止”)
  132. # 延迟一小段时间,确保文件操作完全释放
  133. time.sleep(1)
  134. stream_status.is_streaming = False
  135. stream_status.end_datetime = datetime.now() # 记录结束时间
  136. logger.warning(“正在获取三个关键帧…”)
  137. # 找3个关键帧,必须在重命名前执行
  138. folder = Path(f”{stream_status.image_path})
  139. files = [f for f in folder.iterdir() if f.is_file()]
  140. if not files:
  141. return None, None, None
  142. # 按修改时间排序
  143. files.sort(key=lambda x: x.stat().st_mtime)
  144. first = files[0]
  145. last = files[-1]
  146. middle = files[len(files) // 2]
  147. # 转换为Base64
  148. stream_status.image_li = [
  149. self.image_to_base64(str(f)) # 传入字符串路径,避免Path对象序列化问题
  150. for f in [first, middle, last]
  151. ]
  152. logger.warning(“正在重命名文件夹…”)
  153. # 先复制 time_path,因为后续可能被修改
  154. time_path = stream_status.time_path
  155. start_datetime = stream_status.start_datetime
  156. end_datetime = stream_status.end_datetime
  157. save_path = “”
  158. # 调用重命名函数(使用完整 datetime)
  159. if time_path and start_datetime:
  160. path = rename_time_folder(
  161. time_path,
  162. start_datetime, # 传入开始时间
  163. end_datetime # 传入结束时间
  164. )
  165. save_path = path
  166. logger.warning(
  167. f”文件夹已重命名为 {start_datetime.strftime(‘%H时%M分%S秒’)}{end_datetime.strftime(‘%H时%M分%S秒’)})
  168. try:
  169. logger.warning(“正在更新数据…”)
  170. # 更新识别结果
  171. stats = self.correct_overcounted_stats(DetectionEngine.total_count)
  172. print(“stats:”, stats)
  173. stream_status.total_stats = stats
  174. print(“stream_status.total_stats:”, stream_status.total_stats)
  175. self.save_vehicle_stats(stats, save_path)
  176. except Exception as e:
  177. logger.error(f”保存统计信息失败: {e})
  178. print(“正在获取AI答复…”)
  179. self.AI_response()
  180. # 定义总数据,并序列化
  181. all_dict = {
  182. ‘datetime’: stream_status.start_time.strftime(“%Y-%m-%d %H:%M:%S.%f”), # 序列化,转为字符串
  183. ‘image_li’: [bs for bs in stream_status.image_li], # 关键帧转二进制
  184. ‘detect_car’: [_ for _ in stream_status.total_stats.values()],
  185. ‘Al_response’: stream_status.AI_talk,
  186. }
  187. print(“all_dict:”, all_dict)
  188. print(“正在发送信息…”)
  189. await ReportConsumer.send_report_data(all_dict)
  190. executor.shutdown(wait=False)
  191. DetectionEngine.reset_stats()
  192. stream_status.reset_all_dict()
  193. def save_vehicle_stats(self, stats, save_path=None):
  194. # 如果未提供路径,使用默认的 time_path
  195. if not save_path:
  196. save_path = stream_status.time_path
  197. if not save_path:
  198. logger.warning(“无法保存统计信息:路径为空”)
  199. return
  200. stats_path = os.path.join(save_path, ‘vehicle_stats.txt’)
  201. try:
  202. with open(stats_path, ‘w’, encoding=‘utf-8-sig’) as f:
  203. f.write(“车辆统计结果\n”)
  204. f.write(“————————-\n”)
  205. for k, v in stats.items():
  206. f.write(f”{k}: {v}\n”)
  207. f.write(“————————-\n”)
  208. logger.info(f”统计信息已保存至 {stats_path})
  209. except Exception as e:
  210. logger.error(f”写入统计信息失败: {e})
  211. def AI_response(self):
  212. “””获取AI回复”””
  213. url = “https://spark-api-open.xf-yun.com/v1/chat/completions”
  214. headers = {
  215. “Authorization”: “Bearer pDLDvpUbFDTEQZACQSjD:CqZewebAdgpuvxrVbbAv”,
  216. “Content-Type”: “application/json”,
  217. }
  218. content = (
  219. f’我现在要做一个无人机城市交通智慧巡检报告,现在巡检结果是:巡检时间:{stream_status.start_time}
  220. f’小汽车有{stream_status.total_stats[“car”]}辆,面包车有{stream_status.total_stats[“van”]}辆,’
  221. f’公交车有{stream_status.total_stats[“bus”]}辆,卡车有{stream_status.total_stats[“truck”]}辆,’
  222. f’识别到的违停车辆共有{stream_status.total_stats[“illegally_car”]}辆。’
  223. f’帮我生成一段结论,要求不要有废话,也不要写具体那四个类别识别到的车辆数,字数200字,语言精炼严谨’
  224. )
  225. question = {
  226. “role”: “user”,
  227. “content”: content
  228. }
  229. data = {
  230. “max_tokens”: 4096,
  231. “top_k”: 4,
  232. “messages”: [question],
  233. “temperature”: 0.5,
  234. “model”: “4.0Ultra”,
  235. “stream”: False,
  236. }
  237. response = requests.post(url, headers=headers, json=data)
  238. response.raise_for_status()
  239. result = response.json()
  240. # 提取模型输出的内容
  241. reply = result.get(‘choices’, [{}])[0].get(‘message’, {}).get(‘content’, ).strip()
  242. print(reply)
  243. stream_status.AI_talk = reply
  244. # 读取图片文件并转换为 Base64
  245. @staticmethod # 静态方法不应接收self参数
  246. def image_to_base64(image_path):
  247. with open(image_path, “rb”) as img_file:
  248. base64_str = base64.b64encode(img_file.read()).decode(“utf-8”)
  249. return f”data:image/jpeg;base64,{base64_str}
  250. @staticmethod # 去掉self第一个参数
  251. def correct_overcounted_stats(stats, avg_appearance=6):
  252. “””
  253. 简单修正严重重复统计的结果
  254. “””
  255. corrected = {}
  256. for cls in [‘car’, ‘van’, ‘bus’, ‘truck’]:
  257. if cls in stats:
  258. corrected[cls] = max(0, int(round(stats[cls] / avg_appearance)))
  259. # 可选:重新计算 total
  260. corrected[‘total’] = sum(corrected.values())
  261. corrected[‘illegally_car’] = int(corrected[‘total’] / 100)
  262. print(corrected)
  263. return corrected
  264. async def send_frame(self, event):
  265. frame_bytes = event[“frame”]
  266. await self.send(bytes_data=frame_bytes)

    stream_status.py:

    本文件用于 Django 项目全局状态管理,相当于 vue3 中的 pinia , is_streaming 是前端推本地视频、后端上传 SRS 后,变为 True ,后端因为 is_streaming = True 而开始拉流,当超过 6 秒未拉到流,判断流结束,is_streaming = False。还定义了全局状态锁、状态重置方法、时间获取方法、文件夹名获取方法、状态实例。全局状态锁是为了防止多线程修改同一个共享变量,只有with status_lock:在 with 块内,只有当前线程持有锁时才能运行代码,其他线程必须等待,相当于在此处把锁”锁“起来了。

    1. import threading
    2. # 全局状态锁和状态对象
    3. status_lock = threading.Lock()
    4. class StreamStatus:
    5. def __init__(self):
    6. self.is_streaming = False # 是否正在推流/检测
    7. self.start_time = None # 推流开始时间(datetime对象)
    8. self.end_time = None # 推流结束时间(datetime对象)
    9. self.time_path = None # 当前时间文件夹路径(如 Media/2025-06-01/12点)
    10. self.total_stats = { # 车辆统计
    11. “car”: 0,
    12. “van”: 0,
    13. “bus”: 0,
    14. “truck”: 0,
    15. “total”: 0,
    16. “illegally_car”: 0,
    17. }
    18. self.last_frame_time = None # 最后收到有效帧的时间(用于断流检测)
    19. self.image_path = None
    20. self.image_li = []
    21. self.AI_talk = None
    22. self.all_dict = {}
    23. @property
    24. def date_folder(self):
    25. “””获取日期文件夹名称(如 2025-06-01)”””
    26. if self.start_time:
    27. return self.start_time.strftime(“%Y-%m-%d”)
    28. return None
    29. @property
    30. def start_hour(self):
    31. “””获取开始小时(如 12)”””
    32. if self.start_time:
    33. return self.start_time.hour
    34. return None
    35. def reset(self):
    36. “””重置状态(用于新任务开始)”””
    37. self.is_streaming = False
    38. self.start_time = None
    39. self.end_time = None
    40. self.time_path = None
    41. self.total_stats = {k: 0 for k in self.total_stats}
    42. self.last_frame_time = None
    43. def reset_all_dict(self):
    44. self.last_frame_time = None # 最后收到有效帧的时间(用于断流检测)
    45. self.image_path = None
    46. self.image_li = []
    47. self.AI_talk = None
    48. self.all_dict = {}
    49. self.all_dict = {}
    50. # 全局唯一的状态实例
    51. stream_status = StreamStatus()

     

    python

    detection_engine.py:

    此处是 detection_consumer.py 使用的服务代码块,用于为其提供帧识别服务,consumer 拉到的每一个流都会调用该类中的 process_frame 类方法,并同时进行计数和关键帧保存

    1. import threading
    2. import time
    3. import cv2
    4. from ultralytics import YOLO
    5. import logging
    6. import os
    7. from app01.stream_status import stream_status, status_lock # 从新模块导入
    8. logger = logging.getLogger(__name__)
    9. MODEL_PATH = “F:\\FullStack\\Django\\DJI_yolo\\app01\\car_best.pt”
    10. model = YOLO(MODEL_PATH)
    11. # 关键帧存储位置
    12. pictures_dir = “pictures”
    13. if not os.path.exists(pictures_dir):
    14. os.makedirs(pictures_dir)
    15. class DetectionEngine:
    16. frame_counter = 1
    17. counter_lock = threading.Lock() # 线程锁,保证计数器安全
    18. # 新增:全局计数器和已统计 ID
    19. total_count = {
    20. ‘car’: 0,
    21. ‘van’: 0,
    22. ‘bus’: 0,
    23. ‘truck’: 0,
    24. “total”: 0,
    25. “illegally_car”: 0
    26. }
    27. # 下次任务重置
    28. @classmethod
    29. def reset_stats(cls):
    30. with cls.counter_lock:
    31. for k in cls.total_count:
    32. cls.total_count[k] = 0
    33. # 下次任务重置
    34. @classmethod
    35. def update_stats(cls, vehicle_type):
    36. with cls.counter_lock:
    37. if vehicle_type in cls.total_count:
    38. cls.total_count[vehicle_type] += 1
    39. cls.total_count[“total”] += 1
    40. def __init__(self):
    41. self.model = model
    42. # 调整检测参数,减少计算量
    43. self.detect_params = {
    44. ‘conf’: 0.3, # 提高置信度阈值,减少无效检测
    45. ‘iou’: 0.5, # 调整 NMS 阈值
    46. ‘imgsz’: [384, 640], # 输入尺寸,降低分辨率
    47. ‘classes’: [0, 1, 2, 3], # 仅检测车辆类别(2: car, 5: bus, 7: truck)
    48. ‘device’: 0, # 使用 GPU
    49. ‘half’: True, # 使用 FP16 精度
    50. ‘show’: False # 关闭实时显示,减少开销
    51. }
    52. def process_frame(self, frame):
    53. “””处理单帧图像:YOLO推理 + 保存结果到关键帧图片文件夹”””
    54. results = model(frame)
    55. detected_objects = []
    56. annotated_frame = frame.copy() # 复制原始帧以防止修改原图
    57. # 防御性检查:确保 results 非空且包含有效数据
    58. if not results or len(results) == 0:
    59. logger.warning(“未获取到检测结果,跳过处理”)
    60. return frame, []
    61. boxes = results[0].boxes
    62. if boxes is None or len(boxes) == 0:
    63. logger.warning(“检测结果为空,跳过处理”)
    64. return frame, []
    65. try:
    66. # 假设类别信息存储在 cls 属性中
    67. classes = boxes.cls.int().cpu().tolist() if hasattr(boxes, ‘cls’) and boxes.cls is not None else []
    68. for class_id in classes:
    69. detected_objects.append(class_id)
    70. annotated_frame = results[0].plot() # 绘制检测框
    71. except Exception as e:
    72. logger.error(f”解析检测数据失败: {e})
    73. return annotated_frame, []
    74. with status_lock:
    75. car_count = sum(1 for cls in detected_objects if cls == 0)
    76. if car_count > 18:
    77. try:
    78. timestamp = int(time.time())
    79. save_path = os.path.join(stream_status.image_path, f”keyframe_{timestamp}.jpg”)
    80. cv2.imwrite(save_path, annotated_frame)
    81. logger.info(f”关键帧保存成功: {save_path})
    82. except Exception as e:
    83. logger.error(f”保存关键帧失败: {e})
    84. return annotated_frame, detected_objects

     

    repoet_consumer.py:

    用于将前面 detection_consumer.py 中的字典数据发送给前端,首先会将数据缓存到后端,等到前端消费者建立成功,立马发送出去。

    1. import asyncio
    2. from channels.generic.websocket import AsyncWebsocketConsumer
    3. from channels.layers import get_channel_layer
    4. import json
    5. # 缓存队列:用于保存尚未发送的数据
    6. queue = []
    7. # 记录是否有消费者连接
    8. has_consumer = False
    9. class ReportConsumer(AsyncWebsocketConsumer):
    10. async def connect(self):
    11. global has_consumer
    12. # 加入指定组
    13. await self.channel_layer.group_add(“report_group”, self.channel_name)
    14. await self.accept()
    15. print(“巡检报告WebSocket 已连接”)
    16. # 设置有消费者连接的标志
    17. has_consumer = True
    18. # 尝试发送缓存数据
    19. await flush_queue(self.channel_layer)
    20. async def disconnect(self, close_code):
    21. global has_consumer
    22. # 移除组成员
    23. await self.channel_layer.group_discard(“report_group”, self.channel_name)
    24. print(“巡检报告WebSocket 断开连接”)
    25. # 重置消费者连接标志
    26. has_consumer = False
    27. async def receive(self, text_data=None, bytes_data=None):
    28. pass
    29. async def send_report(self, event):
    30. “””处理 send_report 类型的消息并发送给客户端”””
    31. data = event[‘data’]
    32. print(“【发送到客户端】”, data)
    33. # 转换为JSON字符串发送
    34. try:
    35. # 处理可能包含的非JSON可序列化对象
    36. # 这里假设 image_li 包含 Path 对象,需要转换为字符串
    37. if ‘image_li’ in data:
    38. data[‘image_li’] = [str(path) for path in data[‘image_li’]]
    39. await self.send(text_data=json.dumps(data))
    40. except Exception as e:
    41. print(f”【发送到客户端失败】{e})
    42. @classmethod
    43. async def send_report_data(cls, data):
    44. “””供其他模块调用的方法”””
    45. global queue, has_consumer
    46. # 不再检查是否有人连接,直接发送或缓存,生产模式用Redis
    47. channel_layer = get_channel_layer()
    48. # 如果没有消费者连接,将数据加入队列
    49. if not has_consumer:
    50. queue.append(data)
    51. print(f”【数据已缓存】等待消费者连接: {data})
    52. else:
    53. # 有消费者连接,直接发送
    54. await send_report(channel_layer, data)
    55. # ========== 全局函数 ==========
    56. # 定期检查并发送缓存数据的任务
    57. async def queue_check_task():
    58. channel_layer = get_channel_layer()
    59. while True:
    60. await asyncio.sleep(1) # 每秒检查一次
    61. if queue and has_consumer:
    62. await flush_queue(channel_layer)
    63. # 在应用启动时启动队列检查任务
    64. async def start_queue_check():
    65. asyncio.create_task(queue_check_task())
    66. async def send_report(channel_layer, data):
    67. try:
    68. await channel_layer.group_send(
    69. “report_group”,
    70. {
    71. “type”: “send_report”,
    72. “data”: data,
    73. },
    74. )
    75. print(“【发送成功】”, data)
    76. except Exception as e:
    77. print(f”【发送失败】{e})
    78. async def flush_queue(channel_layer):
    79. global queue
    80. if not queue:
    81. return
    82. items = list(queue)
    83. queue.clear()
    84. for data in items:
    85. await send_report(channel_layer, data)

     

    routings.py:

    用于定义 websocket 的后端接收路径,并连接消费者

    1. from django.urls import re_path
    2. from app01.consumers import detection_consumer, report_consumer
    3. websocket_urlpatterns = [
    4. re_path(r’^ws/detection/$’, detection_consumer.DetectionStreamConsumer.as_asgi()),
    5. re_path(r’^ws/report/$’, report_consumer.ReportConsumer.as_asgi()),
    6. ]

     

    urls.py:

    用于接收前端发送的 http 请求,并与 view.py 中的视图函数建立联系

    1. “””
    2. URL configuration for DJI_yolo project.
    3. The `urlpatterns` list routes URLs to views. For more information please see:
    4. https://docs.djangoproject.com/en/5.1/topics/http/urls/
    5. Examples:
    6. Function views
    7. 1. Add an import: from my_app import views
    8. 2. Add a URL to urlpatterns: path(”, views.home, name=’home’)
    9. Class-based views
    10. 1. Add an import: from other_app.views import Home
    11. 2. Add a URL to urlpatterns: path(”, Home.as_view(), name=’home’)
    12. Including another URLconf
    13. 1. Import the include() function: from django.urls import include, path
    14. 2. Add a URL to urlpatqterns: path(‘blog/’, include(‘blog.urls’))
    15. “””
    16. from django.conf.urls.static import static
    17. from django.contrib import admin
    18. from django.urls import path
    19. from DJI_yolo import settings
    20. from app01 import views
    21. urlpatterns = [
    22. path(“admin/”, admin.site.urls),
    23. path(‘csrf/’, views.get_csrf_token, name=‘get_csrf_token’),
    24. # 推流测试
    25. path(‘push_stream/’, views.push_stream, name=‘push_stream’),
    26. # 媒体库管理
    27. path(‘api4/date-folders/’, views.get_date_folders, name=‘date-folders’),
    28. path(‘api4/time-folders/<str:date>/’, views.get_time_folders, name=‘time-folders’),
    29. path(‘api4/files/<str:date>/<str:time_range>/<str:category>/count/’, views.get_file_count, name=‘file-count’),
    30. path(‘api4/files/<str:date>/<str:time_range>/<str:category>/’, views.get_files, name=‘files’),
    31. path(‘api4/delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>/’, views.delete_file,
    32. name=‘delete-file’),
    33. path(‘delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>’, views.delete_file,
    34. name=‘delete-file’), # 添加一个用于删除文件的 URL 路径
    35. ] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)

     

    views.py:

    用于处理前端请求,这里报告推流、媒体管理系统、原始视频存储逻辑等等。

    1. # views.py
    2. from datetime import datetime
    3. from pathlib import Path
    4. import urllib.parse
    5. from django.middleware.csrf import get_token
    6. import os
    7. import subprocess
    8. import threading
    9. from django.http import JsonResponse
    10. from django.conf import settings
    11. import asyncio
    12. from .consumers.detection_consumer import DetectionStreamConsumer
    13. from .utils import create_date_folder, create_time_folder
    14. from .stream_status import stream_status
    15. MEDIA_ROOT = Path(settings.MEDIA_ROOT)
    16. def get_csrf_token(request):
    17. token = get_token(request)
    18. print(token)
    19. return JsonResponse({‘csrfToken’: token})
    20. def push_stream(request):
    21. print(“前端返回本地视频,准备推流”)
    22. video_file = request.FILES.get(‘video’)
    23. if not video_file:
    24. return JsonResponse({‘error’: ‘未上传视频文件’}, status=400)
    25. # ——————- 创建精确时间文件夹 ——————-
    26. start_datetime = datetime.now() # 记录精确到秒的开始时间
    27. date_str = start_datetime.strftime(‘%Y-%m-%d’)
    28. date_path = create_date_folder(date_str)
    29. stream_status.start_time = datetime.now()
    30. # 创建带时分秒的文件夹(如 15:30:45)
    31. time_path, time_folder = create_time_folder(date_path, start_datetime)
    32. # 创建三类子文件夹
    33. original_path = os.path.join(time_path, ‘原视频’)
    34. recognized_path = os.path.join(time_path, ‘识别视频’)
    35. image_path = os.path.join(time_path, ‘关键帧图片’)
    36. stream_status.image_path = image_path # 保存到全局状态
    37. for folder in [original_path, recognized_path, image_path]:
    38. os.makedirs(folder, exist_ok=True)
    39. # ——————- 保存上传视频 ——————-
    40. original_video_name = f”original_{start_datetime.strftime(‘%H%M%S’)}.mp4″
    41. original_video_path = os.path.join(original_path, original_video_name)
    42. with open(original_video_path, ‘wb’) as f:
    43. for chunk in video_file.chunks():
    44. f.write(chunk)
    45. # ——————- 推流配置 ——————-
    46. push_url = “rtmp://127.0.0.1:1935/live/stream”
    47. recognized_video_name = f”recognized_{start_datetime.strftime(‘%H%M%S’)}.mp4″
    48. recognized_video_path = os.path.join(recognized_path, recognized_video_name)
    49. command = [
    50. “ffmpeg”,
    51. “-re”,
    52. “-i”, original_video_path,
    53. “-c:v”, “libx264”,
    54. “-preset”, “ultrafast”,
    55. “-tune”, “zerolatency”,
    56. “-g”, “50”,
    57. “-r”, “30”,
    58. “-an”,
    59. “-f”, “flv”,
    60. push_url,
    61. “-f”, “mp4”,
    62. recognized_video_path
    63. ]
    64. try:
    65. process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    66. # 保存完整时间状态
    67. stream_status.is_streaming = True
    68. stream_status.start_datetime = start_datetime # 保存完整 datetime
    69. stream_status.time_path = time_path
    70. stream_status.recognized_path = recognized_path
    71. stream_status.image_path = image_path
    72. detection_thread = threading.Thread(
    73. target=run_detection_in_background,
    74. daemon=True
    75. )
    76. detection_thread.start()
    77. return JsonResponse({
    78. ‘status’: ‘success’,
    79. ‘date_folder’: date_str,
    80. ‘time_folder’: time_folder # 格式如 15:30:45
    81. })
    82. except Exception as e:
    83. return JsonResponse({‘error’: str(e)}, status=500)
    84. def run_detection_in_background():
    85. try:
    86. loop = asyncio.new_event_loop()
    87. asyncio.set_event_loop(loop)
    88. loop.run_until_complete(DetectionStreamConsumer().start_detection())
    89. except Exception as e:
    90. print(f”后台检测线程异常: {e})
    91. def get_date_folders(request):
    92. page = int(request.GET.get(‘page’, 1))
    93. page_size = int(request.GET.get(‘page_size’, 10))
    94. MEDIA_ROOT_PATH = Path(r’F:\FullStack\Django\DJI_yolo\media’)
    95. date_folders = []
    96. for name in os.listdir(MEDIA_ROOT_PATH):
    97. folder_path = MEDIA_ROOT_PATH / name
    98. if folder_path.is_dir() and len(name.split(‘-‘)) == 3:
    99. date_folders.append({‘date’: name})
    100. date_folders.sort(key=lambda x: x[‘date’], reverse=True)
    101. start = (page – 1) * page_size
    102. end = start + page_size
    103. return JsonResponse({
    104. ‘data’: date_folders[start:end],
    105. ‘total’: len(date_folders)
    106. })
    107. def get_time_folders(request, date):
    108. date_path = Path(r’F:\FullStack\Django\DJI_yolo\media’) / date
    109. if not date_path.is_dir():
    110. return JsonResponse({‘error’: ‘日期文件夹不存在’}, status=404)
    111. page = int(request.GET.get(‘page’, 1))
    112. page_size = int(request.GET.get(‘page_size’, 10))
    113. time_folders = []
    114. for name in os.listdir(date_path):
    115. time_path = date_path / name
    116. if time_path.is_dir():
    117. original_count = len(os.listdir(time_path / ‘原视频’)) if (time_path / ‘原视频’).is_dir() else 0
    118. recognized_count = len(os.listdir(time_path / ‘识别视频’)) if (time_path / ‘识别视频’).is_dir() else 0
    119. image_count = len(os.listdir(time_path / ‘关键帧图片’)) if (time_path / ‘关键帧图片’).is_dir() else 0
    120. time_folders.append({
    121. ‘time_range’: name, # 直接使用时间区间名称
    122. ‘original_count’: original_count,
    123. ‘recognized_count’: recognized_count,
    124. ‘image_count’: image_count
    125. })
    126. time_folders.sort(key=lambda x: x[‘time_range’], reverse=True)
    127. start = (page – 1) * page_size
    128. end = start + page_size
    129. return JsonResponse({
    130. ‘data’: time_folders[start:end],
    131. ‘total’: len(time_folders)
    132. })
    133. def get_file_count(request, date, time_range, category):
    134. category_path = Path(r’F:\FullStack\Django\DJI_yolo\media’) / date / time_range / category
    135. if not category_path.is_dir():
    136. return JsonResponse({‘count’: 0})
    137. count = len(os.listdir(category_path))
    138. return JsonResponse({‘count’: count})
    139. def get_files(request, date, time_range, category):
    140. try:
    141. decoded_time_range = urllib.parse.unquote(time_range)
    142. category_path = Path(r’F:\FullStack\Django\DJI_yolo\media’) / date / decoded_time_range / category
    143. if not category_path.is_dir():
    144. return JsonResponse({‘error’: ‘文件类别不存在’}, status=404)
    145. page = int(request.GET.get(‘page’, 1))
    146. page_size = int(request.GET.get(‘page_size’, 10))
    147. files = []
    148. for filename in os.listdir(category_path):
    149. file_path = category_path / filename
    150. if file_path.is_file():
    151. encoded_time_range = urllib.parse.quote(decoded_time_range)
    152. files.append({
    153. ‘name’: filename,
    154. ‘url’: f’http://{request.get_host()}/media/{date}/{encoded_time_range}/{category}/{filename}
    155. })
    156. start = (page – 1) * page_size
    157. end = start + page_size
    158. return JsonResponse({
    159. ‘data’: files[start:end],
    160. ‘total’: len(files),
    161. ‘status’: ‘success’
    162. })
    163. except Exception as e:
    164. print(f”文件列表错误: {str(e)})
    165. return JsonResponse({
    166. ‘error’: ‘服务器内部错误’,
    167. ‘detail’: str(e)
    168. }, status=500)
    169. def delete_file(request, date, time_range, category, filename):
    170. try:
    171. decoded_time_range = urllib.parse.unquote(time_range)
    172. file_path = Path(r’F:\FullStack\Django\DJI_yolo\media’) / date / decoded_time_range / category / filename
    173. if not file_path.exists():
    174. return JsonResponse({‘error’: ‘文件不存在’}, status=404)
    175. os.remove(file_path)
    176. return JsonResponse({‘status’: ‘success’})
    177. except Exception as e:
    178. return JsonResponse({‘error’: str(e)}, status=500)

     

    python

    运行

    web端rtmp推拉流测试、抽帧识别计数,一键式生成巡检报告
收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,请不要用于商业用途!
3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需!
7. 如遇到加密压缩包,默认解压密码为"dtmb.taobao.com",如遇到无法解压的请联系管理员!
8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载
声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性

山喂资源分享 网页素材 web端rtmp推拉流测试、抽帧识别计数,一键式生成巡检报告 https://www.023140.com/669.html

web端rtmp推拉流测试、抽帧识别计数,一键式生成巡检报告
上一篇:

已经没有上一篇了!

常见问题
  • 本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
查看详情
  • 最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,若小于网盘提示的容量则是这个原因。这是浏览器下载的bug,建议用百度网盘软件或迅雷下载。 若排除这种情况,可在对应资源底部留言,或 联络我们.。
查看详情

相关文章

官方客服团队

为您解决烦忧 - 24小时在线 专业服务