最近 AI Team 在研究如何使用 NVIDIA 的 Sample 做動態的增減 Source,而需求是不想外部輸入 Source 改成程式內部使用列表帶入,而且可以在後面動態加入 Source,所以我對程式做了一點修改不過在開始說我修改了什麼之前先來看一下目前的 Sample。
DeepStream 範例分為 C/C++ 與 Python 兩種
首先我們可以看到 DeepStream 有分為 C/C++ 的 Sample 與 Python 兩種基本上它們大同小異,理解起來也是用差不多的邏輯寫的。而我們公司平常使用 Python 來開發所以可以看到 deepstream_python_apps 裡面的 runtime_source_add_delete 資料夾,它就是動態增減輸入源的範例。
動態增減輸入源範例 runtime_source_add_delete
執行專案
要把它跑起來之前先 clone 專案
$ git clone https://github.com/NVIDIA-AI-IOT/deepstream_python_apps.git $ cd runtime_source_add_delete
切換到資料後可以看到 deepstream_rt_src_add_del.py 是它的執行檔案,而要把它跑起來很簡單只要使用一行指令就可以完成了
$ python3 deepstream_rt_src_add_del.py file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4
在它執行中可以發現它每 10s 就會把一個 source 放入執行,而以這邊的範例來說就是把 file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4 放入下去執行,而在一開始 MAX_NUM_SOURCES 宣告了 4,所以最多加入到 4 就會開始每隔 10s 刪除,直到變成 0 結束整個程式。
運作原理
為什麼沒辦法處理多個 source
知道它是怎麼運作之後就可以來看 main 是怎麼處理這支程式的,不過在開始之前可能會有一個疑問它其他的範例都是可以一次放入多個 source 為什麼就這隻不行,可以看到 L356 它這邊把 uri 寫死了,所以 source 的部分只會使用第一個 source 放入
uri = args[1]
而且 add_sources(data) 在 L247 又只使用這個變數所以它就沒辦法處理多個 source 的情況
source_bin = create_uridecode_bin(source_id, uri)
pipeline 順序
可以看到它主體就是把多個功能使用 pipeline 的方法串接起來,而串接的順序
sourcebin -> streammux -> nvinfer -> nvtracker -> nvdsanalytics -> nvtiler -> nvvideoconvert -> nvdsosd -> (if aarch64, transform ->) sink
程式主體分析
可以看到 L541 它用來處裡 10s 執行 add_sources
GObject.timeout_add_seconds(10, add_sources, g_source_bin_list)
所以每 10s 就會執行新增 source 的動作可以看到 L237 – L239 會去檢查目前 source 是不是在開啟的狀態,如果不是就重新找新的 source
source_id = random.randrange(0, MAX_NUM_SOURCES) while (g_source_enabled[source_id]): source_id = random.randrange(0, MAX_NUM_SOURCES)
在 L277 – L279 可以看到它把數量加到 MAX_NUM_SOURCES 後就會執行 delete_sources 把 source 都清理乾淨
if (g_num_sources == MAX_NUM_SOURCES): GObject.timeout_add_seconds(10, delete_sources, g_source_bin_list) return False
而在 delete_sources 可以看到 L199 – L200 會照順序去檢查,所有的影像 List 檢查影片是否播放完成與是否是執行狀態再對它做資源回收
for source_id in range(MAX_NUM_SOURCES): if (g_eos_list[source_id] and g_source_enabled[source_id]): g_source_enabled[source_id] = False stop_release_source(source_id)
在 L205 – L208 與 L221 – L224 會去確定都播放完成就結束程式
if (g_num_sources == 0): loop.quit() print("All sources stopped quitting") return False
以上就是簡單的程式原理介紹。
修改 runtime_source_add_delete 使用列表
上面提到我修改了它使用程式內部 List 的方法執行,有需要可以參考我的修改方法。 Gist
#!/usr/bin/env python3 ################################################################################ # SPDX-FileCopyrightText: Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ import sys sys.path.append('../') import gi import configparser gi.require_version('Gst', '1.0') from gi.repository import GObject, Gst from gi.repository import GLib from ctypes import * import time import sys import math import random import platform from common.is_aarch_64 import is_aarch64 import pyds MAX_DISPLAY_LEN=64 PGIE_CLASS_ID_VEHICLE = 0 PGIE_CLASS_ID_BICYCLE = 1 PGIE_CLASS_ID_PERSON = 2 PGIE_CLASS_ID_ROADSIGN = 3 MUXER_OUTPUT_WIDTH=1920 MUXER_OUTPUT_HEIGHT=1080 MUXER_BATCH_TIMEOUT_USEC=4000000 TILED_OUTPUT_WIDTH=1280 TILED_OUTPUT_HEIGHT=720 GPU_ID = 0 MAX_NUM_SOURCES = 4 SINK_ELEMENT = "nveglglessink" PGIE_CONFIG_FILE = "dstest_pgie_config.txt" TRACKER_CONFIG_FILE = "dstest_tracker_config.txt" SGIE1_CONFIG_FILE = "dstest_sgie1_config.txt" SGIE2_CONFIG_FILE = "dstest_sgie2_config.txt" SGIE3_CONFIG_FILE = "dstest_sgie3_config.txt" CONFIG_GPU_ID = "gpu-id" CONFIG_GROUP_TRACKER = "tracker" CONFIG_GROUP_TRACKER_WIDTH = "tracker-width" CONFIG_GROUP_TRACKER_HEIGHT = "tracker-height" CONFIG_GROUP_TRACKER_LL_CONFIG_FILE = "ll-config-file" CONFIG_GROUP_TRACKER_LL_LIB_FILE = "ll-lib-file" CONFIG_GROUP_TRACKER_ENABLE_BATCH_PROCESS = "enable-batch-process" g_num_sources = 0 g_source_id_list = [0] * MAX_NUM_SOURCES g_eos_list = [False] * MAX_NUM_SOURCES g_source_enabled = [False] * MAX_NUM_SOURCES g_source_bin_list = [None] * MAX_NUM_SOURCES pgie_classes_str= ["Vehicle", "TwoWheeler", "Person","RoadSign"] loop = None pipeline = None streammux = None sink = None pgie = None sgie1 = None sgie2 = None sgie3 = None nvvideoconvert = None nvosd = None tiler = None tracker = None def decodebin_child_added(child_proxy,Object,name,user_data): print("Decodebin child added:", name, "\n") if(name.find("decodebin") != -1): Object.connect("child-added",decodebin_child_added,user_data) if(name.find("nvv4l2decoder") != -1): if (is_aarch64()): Object.set_property("enable-max-performance", True) Object.set_property("drop-frame-interval", 0) Object.set_property("num-extra-surfaces", 0) else: Object.set_property("gpu_id", GPU_ID) def cb_newpad(decodebin,pad,data): global streammux print("In cb_newpad\n") caps=pad.get_current_caps() gststruct=caps.get_structure(0) gstname=gststruct.get_name() # Need to check if the pad created by the decodebin is for video and not # audio. print("gstname=",gstname) if(gstname.find("video")!=-1): source_id = data pad_name = "sink_%u" % source_id print(pad_name) #Get a sink pad from the streammux, link to decodebin sinkpad = streammux.get_request_pad(pad_name) if pad.link(sinkpad) == Gst.PadLinkReturn.OK: print("Decodebin linked to pipeline") else: sys.stderr.write("Failed to link decodebin to pipeline\n") def create_uridecode_bin(index,filename): global g_source_id_list print("Creating uridecodebin for [%s]" % filename) # Create a source GstBin to abstract this bin's content from the rest of the # pipeline g_source_id_list[index] = index bin_name="source-bin-%02d" % index print(bin_name) # Source element for reading from the uri. # We will use decodebin and let it figure out the container format of the # stream and the codec and plug the appropriate demux and decode plugins. bin=Gst.ElementFactory.make("uridecodebin", bin_name) if not bin: sys.stderr.write(" Unable to create uri decode bin \n") # We set the input uri to the source element bin.set_property("uri",filename) # Connect to the "pad-added" signal of the decodebin which generates a # callback once a new pad for raw data has been created by the decodebin bin.connect("pad-added",cb_newpad,g_source_id_list[index]) bin.connect("child-added",decodebin_child_added,g_source_id_list[index]) #Set status of the source to enabled g_source_enabled[index] = True return bin def stop_release_source(source_id): global g_num_sources global g_source_bin_list global streammux global pipeline #Attempt to change status of source to be released state_return = g_source_bin_list[source_id].set_state(Gst.State.NULL) if state_return == Gst.StateChangeReturn.SUCCESS: print("STATE CHANGE SUCCESS\n") pad_name = "sink_%u" % source_id print(pad_name) #Retrieve sink pad to be released sinkpad = streammux.get_static_pad(pad_name) #Send flush stop event to the sink pad, then release from the streammux sinkpad.send_event(Gst.Event.new_flush_stop(False)) streammux.release_request_pad(sinkpad) print("STATE CHANGE SUCCESS\n") #Remove the source bin from the pipeline pipeline.remove(g_source_bin_list[source_id]) source_id -= 1 g_num_sources -= 1 elif state_return == Gst.StateChangeReturn.FAILURE: print("STATE CHANGE FAILURE\n") elif state_return == Gst.StateChangeReturn.ASYNC: state_return = g_source_bin_list[source_id].get_state(Gst.CLOCK_TIME_NONE) pad_name = "sink_%u" % source_id print(pad_name) sinkpad = streammux.get_static_pad(pad_name) sinkpad.send_event(Gst.Event.new_flush_stop(False)) streammux.release_request_pad(sinkpad) print("STATE CHANGE ASYNC\n") pipeline.remove(g_source_bin_list[source_id]) source_id -= 1 g_num_sources -= 1 def delete_sources(data): global loop global g_num_sources global g_eos_list global g_source_enabled #First delete sources that have reached end of stream for source_id in range(MAX_NUM_SOURCES): if (g_eos_list[source_id] and g_source_enabled[source_id]): g_source_enabled[source_id] = False stop_release_source(source_id) #Quit if no sources remaining if (g_num_sources == 0): loop.quit() print("All sources stopped quitting") return False #Randomly choose an enabled source to delete source_id = random.randrange(0, MAX_NUM_SOURCES) while (not g_source_enabled[source_id]): source_id = random.randrange(0, MAX_NUM_SOURCES) #Disable the source g_source_enabled[source_id] = False #Release the source print("Calling Stop %d " % source_id) stop_release_source(source_id) #Quit if no sources remaining if (g_num_sources == 0): loop.quit() print("All sources stopped quitting") return False return True def add_sources(uri): global g_num_sources global g_source_enabled global g_source_bin_list #Randomly select an un-enabled source to add source_id = random.randrange(0, MAX_NUM_SOURCES) while (g_source_enabled[source_id]): source_id = random.randrange(0, MAX_NUM_SOURCES) #Enable the source g_source_enabled[source_id] = True print("Calling Start %d " % source_id) #Create a uridecode bin with the chosen source id source_bin = create_uridecode_bin(source_id, uri) if (not source_bin): sys.stderr.write("Failed to create source bin. Exiting.") exit(1) #Add source bin to our list and to pipeline g_source_bin_list[source_id] = source_bin pipeline.add(source_bin) #Set state of source bin to playing state_return = g_source_bin_list[source_id].set_state(Gst.State.PLAYING) if state_return == Gst.StateChangeReturn.SUCCESS: print("STATE CHANGE SUCCESS\n") source_id += 1 elif state_return == Gst.StateChangeReturn.FAILURE: print("STATE CHANGE FAILURE\n") elif state_return == Gst.StateChangeReturn.ASYNC: state_return = g_source_bin_list[source_id].get_state(Gst.CLOCK_TIME_NONE) source_id += 1 elif state_return == Gst.StateChangeReturn.NO_PREROLL: print("STATE CHANGE NO PREROLL\n") g_num_sources += 1 return True def bus_call(bus, message, loop): global g_eos_list t = message.type if t == Gst.MessageType.EOS: sys.stdout.write("End-of-stream\n") loop.quit() elif t==Gst.MessageType.WARNING: err, debug = message.parse_warning() sys.stderr.write("Warning: %s: %s\n" % (err, debug)) elif t == Gst.MessageType.ERROR: err, debug = message.parse_error() sys.stderr.write("Error: %s: %s\n" % (err, debug)) loop.quit() elif t == Gst.MessageType.ELEMENT: struct = message.get_structure() #Check for stream-eos message if struct is not None and struct.has_name("stream-eos"): parsed, stream_id = struct.get_uint("stream-id") if parsed: #Set eos status of stream to True, to be deleted in delete-sources print("Got EOS from stream %d" % stream_id) g_eos_list[stream_id] = True return True def main(args): global g_num_sources global g_source_bin_list # global uri global loop global pipeline global streammux global sink global pgie global sgie1 global sgie2 global sgie3 global nvvideoconvert global nvosd global tiler global tracker g_source_bin_list.append("file:///opt/nvidia/deepstream/deepstream-5.1/sources/python/apps/deepstream-nvdsanalytics/Car_stream.h264") g_source_bin_list.append("file:///opt/nvidia/deepstream/deepstream-5.1/sources/python/apps/deepstream-nvdsanalytics/People_stream.h264") num_sources=len(g_source_bin_list) # Standard GStreamer initialization GObject.threads_init() Gst.init(None) # Create gstreamer elements */ # Create Pipeline element that will form a connection of other elements print("Creating Pipeline \n ") pipeline = Gst.Pipeline() is_live = False if not pipeline: sys.stderr.write(" Unable to create Pipeline \n") print("Creating streammux \n ") # Create nvstreammux instance to form batches from one or more sources. streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer") if not streammux: sys.stderr.write(" Unable to create NvStreamMux \n") streammux.set_property("batched-push-timeout", 25000) streammux.set_property("batch-size", 30) streammux.set_property("gpu_id", GPU_ID) pipeline.add(streammux) streammux.set_property("live-source", 1) # uri = args[1] for i in range(num_sources): print("Creating source_bin ",i," \n ") uri_name=g_source_bin_list[i] if uri_name.find("rtsp://") == 0 : is_live = True #Create first source bin and add to pipeline source_bin=create_uridecode_bin(i, uri_name) if not source_bin: sys.stderr.write("Failed to create source bin. Exiting. \n") sys.exit(1) g_source_bin_list[i] = source_bin pipeline.add(source_bin) g_num_sources = num_sources print("Creating Pgie \n ") pgie = Gst.ElementFactory.make("nvinfer", "primary-inference") if not pgie: sys.stderr.write(" Unable to create pgie \n") print("Creating nvtracker \n ") tracker = Gst.ElementFactory.make("nvtracker", "tracker") if not tracker: sys.stderr.write(" Unable to create tracker \n") print("Creating tiler \n ") tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler") if not tiler: sys.stderr.write(" Unable to create tiler \n") print("Creating nvvidconv \n ") nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", "convertor") if not nvvideoconvert: sys.stderr.write(" Unable to create nvvidconv \n") print("Creating nvosd \n ") nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay") if not nvosd: sys.stderr.write(" Unable to create nvosd \n") if(is_aarch64()): print("Creating transform \n ") transform=Gst.ElementFactory.make("nvegltransform", "nvegl-transform") if not transform: sys.stderr.write(" Unable to create transform \n") sgie1 = Gst.ElementFactory.make("nvinfer", "secondary1-nvinference-engine") if not sgie1: sys.stderr.write(" Unable to make sgie1 \n") sgie2 = Gst.ElementFactory.make("nvinfer", "secondary2-nvinference-engine") if not sgie1: sys.stderr.write(" Unable to make sgie2 \n") sgie3 = Gst.ElementFactory.make("nvinfer", "secondary3-nvinference-engine") if not sgie3: sys.stderr.write(" Unable to make sgie3 \n") print("Creating EGLSink \n") sink = Gst.ElementFactory.make(SINK_ELEMENT, "nvvideo-renderer") if not sink: sys.stderr.write(" Unable to create egl sink \n") if is_live: print("Atleast one of the sources is live") streammux.set_property('live-source', 1) #Set streammux width and height streammux.set_property('width', MUXER_OUTPUT_WIDTH) streammux.set_property('height', MUXER_OUTPUT_HEIGHT) #Set pgie, sgie1, sgie2, and sgie3 configuration file paths pgie.set_property('config-file-path', PGIE_CONFIG_FILE) sgie1.set_property('config-file-path', SGIE1_CONFIG_FILE) sgie2.set_property('config-file-path', SGIE2_CONFIG_FILE) sgie3.set_property('config-file-path', SGIE3_CONFIG_FILE) #Set properties of tracker config = configparser.ConfigParser() config.read(TRACKER_CONFIG_FILE) config.sections() for key in config['tracker']: if key == 'tracker-width' : tracker_width = config.getint('tracker', key) tracker.set_property('tracker-width', tracker_width) if key == 'tracker-height' : tracker_height = config.getint('tracker', key) tracker.set_property('tracker-height', tracker_height) if key == 'gpu-id' : tracker_gpu_id = config.getint('tracker', key) tracker.set_property('gpu_id', tracker_gpu_id) if key == 'll-lib-file' : tracker_ll_lib_file = config.get('tracker', key) tracker.set_property('ll-lib-file', tracker_ll_lib_file) if key == 'll-config-file' : tracker_ll_config_file = config.get('tracker', key) tracker.set_property('ll-config-file', tracker_ll_config_file) if key == 'enable-batch-process' : tracker_enable_batch_process = config.getint('tracker', key) tracker.set_property('enable_batch_process', tracker_enable_batch_process) #Set necessary properties of the nvinfer element, the necessary ones are: pgie_batch_size=pgie.get_property("batch-size") if(pgie_batch_size < MAX_NUM_SOURCES): print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", num_sources," \n") pgie.set_property("batch-size",MAX_NUM_SOURCES) #Set gpu IDs of the inference engines pgie.set_property("gpu_id", GPU_ID) sgie1.set_property("gpu_id", GPU_ID) sgie2.set_property("gpu_id", GPU_ID) sgie3.set_property("gpu_id", GPU_ID) #Set tiler properties tiler_rows=int(math.sqrt(num_sources)) tiler_columns=int(math.ceil((1.0*num_sources)/tiler_rows)) tiler.set_property("rows",tiler_rows) tiler.set_property("columns",tiler_columns) tiler.set_property("width", TILED_OUTPUT_WIDTH) tiler.set_property("height", TILED_OUTPUT_HEIGHT) #Set gpu IDs of tiler, nvvideoconvert, and nvosd tiler.set_property("gpu_id", GPU_ID) nvvideoconvert.set_property("gpu_id", GPU_ID) nvosd.set_property("gpu_id", GPU_ID) #Set gpu ID of sink if not aarch64 if(not is_aarch64()): sink.set_property("gpu_id", GPU_ID) print("Adding elements to Pipeline \n") pipeline.add(pgie) pipeline.add(tracker) pipeline.add(sgie1) pipeline.add(sgie2) pipeline.add(sgie3) pipeline.add(tiler) pipeline.add(nvvideoconvert) pipeline.add(nvosd) pipeline.add(sink) if is_aarch64(): pipeline.add(transform) # We link elements in the following order: # sourcebin -> streammux -> nvinfer -> nvtracker -> nvdsanalytics -> # nvtiler -> nvvideoconvert -> nvdsosd -> (if aarch64, transform ->) sink print("Linking elements in the Pipeline \n") streammux.link(pgie) pgie.link(tracker) tracker.link(sgie1) sgie1.link(sgie2) sgie2.link(sgie3) sgie3.link(tiler) tiler.link(nvvideoconvert) nvvideoconvert.link(nvosd) if is_aarch64(): nvosd.link(transform) transform.link(sink) else: nvosd.link(sink) sink.set_property("sync", 0) sink.set_property("qos",0) # create an event loop and feed gstreamer bus mesages to it loop = GObject.MainLoop() bus = pipeline.get_bus() bus.add_signal_watch() bus.connect ("message", bus_call, loop) pipeline.set_state(Gst.State.PAUSED) # List the sources print("Now playing...") for i, source in enumerate(args): if (i != 0): print(i, ": ", source) print("Starting pipeline \n") # start play back and listed to events pipeline.set_state(Gst.State.PLAYING) add_sources("file:///opt/nvidia/deepstream/deepstream-5.1/samples/streams/sample_720p.mp4") GObject.timeout_add_seconds(10, delete_sources, g_source_bin_list) try: loop.run() except: pass # cleanup print("Exiting app\n") pipeline.set_state(Gst.State.NULL) if __name__ == '__main__': sys.exit(main(sys.argv))
參考資料
- https://docs.nvidia.com/metropolis/deepstream/dev-guide/
- https://github.com/NVIDIA-AI-IOT/deepstream_reference_apps/tree/master/runtime_source_add_delete
- https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/tree/master/apps/runtime_source_add_delete
《AWS CDK 完全學習手冊:打造雲端基礎架構程式碼 IaC》
第 12 屆 iT 邦幫忙鐵人賽 DevOps 組冠的《用 CDK 定 義 AWS 架構》
第 11 屆 iT 邦幫忙鐵人賽《LINE bot 好好玩 30 天玩轉 LINE API》
一個熱愛分享的雲端工程師!