NVIDIA DeepStream SDK 動態增減輸入源

最近 AI Team 在研究如何使用 NVIDIA 的 Sample 做動態的增減 Source,而需求是不想外部輸入 Source 改成程式內部使用列表帶入,而且可以在後面動態加入 Source,所以我對程式做了一點修改不過在開始說我修改了什麼之前先來看一下目前的 Sample。

DeepStream 範例分為 C/C++ 與 Python 兩種

首先我們可以看到 DeepStream 有分為 C/C++ 的 Sample 與 Python 兩種基本上它們大同小異,理解起來也是用差不多的邏輯寫的。而我們公司平常使用 Python 來開發所以可以看到 deepstream_python_apps 裡面的 runtime_source_add_delete 資料夾,它就是動態增減輸入源的範例。

動態增減輸入源範例 runtime_source_add_delete

執行專案

要把它跑起來之前先 clone 專案

$ git clone https://github.com/NVIDIA-AI-IOT/deepstream_python_apps.git
$ cd runtime_source_add_delete

切換到資料後可以看到 deepstream_rt_src_add_del.py 是它的執行檔案,而要把它跑起來很簡單只要使用一行指令就可以完成了

$ python3 deepstream_rt_src_add_del.py file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4

在它執行中可以發現它每 10s 就會把一個 source 放入執行,而以這邊的範例來說就是把 file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4 放入下去執行,而在一開始 MAX_NUM_SOURCES 宣告了 4,所以最多加入到 4 就會開始每隔 10s 刪除,直到變成 0 結束整個程式。

運作原理

為什麼沒辦法處理多個 source

知道它是怎麼運作之後就可以來看 main 是怎麼處理這支程式的,不過在開始之前可能會有一個疑問它其他的範例都是可以一次放入多個 source 為什麼就這隻不行,可以看到 L356 它這邊把 uri 寫死了,所以 source 的部分只會使用第一個 source 放入

uri = args[1]

而且 add_sources(data) 在 L247 又只使用這個變數所以它就沒辦法處理多個 source 的情況

source_bin = create_uridecode_bin(source_id, uri)

pipeline 順序

可以看到它主體就是把多個功能使用 pipeline 的方法串接起來,而串接的順序

sourcebin -> streammux -> nvinfer -> nvtracker -> nvdsanalytics -> nvtiler -> nvvideoconvert -> nvdsosd -> (if aarch64, transform ->) sink

程式主體分析

可以看到 L541 它用來處裡 10s 執行 add_sources

GObject.timeout_add_seconds(10, add_sources, g_source_bin_list)

所以每 10s 就會執行新增 source 的動作可以看到 L237 – L239 會去檢查目前 source 是不是在開啟的狀態,如果不是就重新找新的 source

    source_id = random.randrange(0, MAX_NUM_SOURCES)
    while (g_source_enabled[source_id]):
        source_id = random.randrange(0, MAX_NUM_SOURCES)

L277 – L279 可以看到它把數量加到 MAX_NUM_SOURCES 後就會執行 delete_sources 把 source 都清理乾淨

    if (g_num_sources == MAX_NUM_SOURCES):
        GObject.timeout_add_seconds(10, delete_sources, g_source_bin_list)
        return False

而在 delete_sources 可以看到 L199 – L200 會照順序去檢查,所有的影像 List 檢查影片是否播放完成與是否是執行狀態再對它做資源回收

    for source_id in range(MAX_NUM_SOURCES):
        if (g_eos_list[source_id] and g_source_enabled[source_id]):
            g_source_enabled[source_id] = False
            stop_release_source(source_id)

L205 – L208L221 – L224 會去確定都播放完成就結束程式

    if (g_num_sources == 0):
        loop.quit()
        print("All sources stopped quitting")
        return False

以上就是簡單的程式原理介紹。

修改 runtime_source_add_delete 使用列表

上面提到我修改了它使用程式內部 List 的方法執行,有需要可以參考我的修改方法。 Gist

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import sys
sys.path.append('../')
import gi
import configparser
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from gi.repository import GLib
from ctypes import *
import time
import sys
import math
import random
import platform
from common.is_aarch_64 import is_aarch64

import pyds

MAX_DISPLAY_LEN=64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MUXER_BATCH_TIMEOUT_USEC=4000000
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GPU_ID = 0
MAX_NUM_SOURCES = 4
SINK_ELEMENT = "nveglglessink"
PGIE_CONFIG_FILE = "dstest_pgie_config.txt"
TRACKER_CONFIG_FILE = "dstest_tracker_config.txt"

SGIE1_CONFIG_FILE = "dstest_sgie1_config.txt"
SGIE2_CONFIG_FILE = "dstest_sgie2_config.txt"
SGIE3_CONFIG_FILE = "dstest_sgie3_config.txt"

CONFIG_GPU_ID = "gpu-id"
CONFIG_GROUP_TRACKER = "tracker"
CONFIG_GROUP_TRACKER_WIDTH = "tracker-width"
CONFIG_GROUP_TRACKER_HEIGHT = "tracker-height"
CONFIG_GROUP_TRACKER_LL_CONFIG_FILE = "ll-config-file"
CONFIG_GROUP_TRACKER_LL_LIB_FILE = "ll-lib-file"
CONFIG_GROUP_TRACKER_ENABLE_BATCH_PROCESS = "enable-batch-process"

g_num_sources = 0
g_source_id_list = [0] * MAX_NUM_SOURCES
g_eos_list = [False] * MAX_NUM_SOURCES
g_source_enabled = [False] * MAX_NUM_SOURCES
g_source_bin_list = [None] * MAX_NUM_SOURCES

pgie_classes_str= ["Vehicle", "TwoWheeler", "Person","RoadSign"]

loop = None
pipeline = None
streammux = None
sink = None
pgie = None
sgie1 = None
sgie2 = None
sgie3 = None
nvvideoconvert = None
nvosd = None
tiler = None
tracker = None

def decodebin_child_added(child_proxy,Object,name,user_data):
    print("Decodebin child added:", name, "\n")
    if(name.find("decodebin") != -1):
        Object.connect("child-added",decodebin_child_added,user_data)   
    if(name.find("nvv4l2decoder") != -1):
        if (is_aarch64()):
            Object.set_property("enable-max-performance", True)
            Object.set_property("drop-frame-interval", 0)
            Object.set_property("num-extra-surfaces", 0)
        else:
            Object.set_property("gpu_id", GPU_ID)


def cb_newpad(decodebin,pad,data):
    global streammux
    print("In cb_newpad\n")
    caps=pad.get_current_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        source_id = data
        pad_name = "sink_%u" % source_id
        print(pad_name)
        #Get a sink pad from the streammux, link to decodebin
        sinkpad = streammux.get_request_pad(pad_name)
        if pad.link(sinkpad) == Gst.PadLinkReturn.OK:
            print("Decodebin linked to pipeline")
        else:
            sys.stderr.write("Failed to link decodebin to pipeline\n")


def create_uridecode_bin(index,filename):
    global g_source_id_list
    print("Creating uridecodebin for [%s]" % filename)

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    g_source_id_list[index] = index
    bin_name="source-bin-%02d" % index
    print(bin_name)

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    bin=Gst.ElementFactory.make("uridecodebin", bin_name)
    if not bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    bin.set_property("uri",filename)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has been created by the decodebin
    bin.connect("pad-added",cb_newpad,g_source_id_list[index])
    bin.connect("child-added",decodebin_child_added,g_source_id_list[index])

    #Set status of the source to enabled
    g_source_enabled[index] = True

    return bin


def stop_release_source(source_id):
    global g_num_sources
    global g_source_bin_list
    global streammux
    global pipeline

    #Attempt to change status of source to be released 
    state_return = g_source_bin_list[source_id].set_state(Gst.State.NULL)

    if state_return == Gst.StateChangeReturn.SUCCESS:
        print("STATE CHANGE SUCCESS\n")
        pad_name = "sink_%u" % source_id
        print(pad_name)
        #Retrieve sink pad to be released
        sinkpad = streammux.get_static_pad(pad_name)
        #Send flush stop event to the sink pad, then release from the streammux
        sinkpad.send_event(Gst.Event.new_flush_stop(False))
        streammux.release_request_pad(sinkpad)
        print("STATE CHANGE SUCCESS\n")
        #Remove the source bin from the pipeline
        pipeline.remove(g_source_bin_list[source_id])
        source_id -= 1
        g_num_sources -= 1

    elif state_return == Gst.StateChangeReturn.FAILURE:
        print("STATE CHANGE FAILURE\n")
    
    elif state_return == Gst.StateChangeReturn.ASYNC:
        state_return = g_source_bin_list[source_id].get_state(Gst.CLOCK_TIME_NONE)
        pad_name = "sink_%u" % source_id
        print(pad_name)
        sinkpad = streammux.get_static_pad(pad_name)
        sinkpad.send_event(Gst.Event.new_flush_stop(False))
        streammux.release_request_pad(sinkpad)
        print("STATE CHANGE ASYNC\n")
        pipeline.remove(g_source_bin_list[source_id])
        source_id -= 1
        g_num_sources -= 1


def delete_sources(data):
    global loop
    global g_num_sources
    global g_eos_list
    global g_source_enabled

    #First delete sources that have reached end of stream
    for source_id in range(MAX_NUM_SOURCES):
        if (g_eos_list[source_id] and g_source_enabled[source_id]):
            g_source_enabled[source_id] = False
            stop_release_source(source_id)

    #Quit if no sources remaining
    if (g_num_sources == 0):
        loop.quit()
        print("All sources stopped quitting")
        return False

    #Randomly choose an enabled source to delete
    source_id = random.randrange(0, MAX_NUM_SOURCES)
    while (not g_source_enabled[source_id]):
        source_id = random.randrange(0, MAX_NUM_SOURCES)
    #Disable the source
    g_source_enabled[source_id] = False
    #Release the source
    print("Calling Stop %d " % source_id)
    stop_release_source(source_id)

    #Quit if no sources remaining
    if (g_num_sources == 0):
        loop.quit()
        print("All sources stopped quitting")
        return False

    return True


def add_sources(uri):
    global g_num_sources
    global g_source_enabled
    global g_source_bin_list

    #Randomly select an un-enabled source to add
    source_id = random.randrange(0, MAX_NUM_SOURCES)
    while (g_source_enabled[source_id]):
        source_id = random.randrange(0, MAX_NUM_SOURCES)

    #Enable the source
    g_source_enabled[source_id] = True

    print("Calling Start %d " % source_id)

    #Create a uridecode bin with the chosen source id
    source_bin = create_uridecode_bin(source_id, uri)

    if (not source_bin):
        sys.stderr.write("Failed to create source bin. Exiting.")
        exit(1)
    
    #Add source bin to our list and to pipeline
    g_source_bin_list[source_id] = source_bin
    pipeline.add(source_bin)

    #Set state of source bin to playing
    state_return = g_source_bin_list[source_id].set_state(Gst.State.PLAYING)

    if state_return == Gst.StateChangeReturn.SUCCESS:
        print("STATE CHANGE SUCCESS\n")
        source_id += 1

    elif state_return == Gst.StateChangeReturn.FAILURE:
        print("STATE CHANGE FAILURE\n")
    
    elif state_return == Gst.StateChangeReturn.ASYNC:
        state_return = g_source_bin_list[source_id].get_state(Gst.CLOCK_TIME_NONE)
        source_id += 1

    elif state_return == Gst.StateChangeReturn.NO_PREROLL:
        print("STATE CHANGE NO PREROLL\n")

    g_num_sources += 1
    
    return True

def bus_call(bus, message, loop):
    global g_eos_list
    t = message.type
    if t == Gst.MessageType.EOS:
        sys.stdout.write("End-of-stream\n")
        loop.quit()
    elif t==Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        sys.stderr.write("Warning: %s: %s\n" % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write("Error: %s: %s\n" % (err, debug))
        loop.quit()
    elif t == Gst.MessageType.ELEMENT:
        struct = message.get_structure()
        #Check for stream-eos message
        if struct is not None and struct.has_name("stream-eos"):
            parsed, stream_id = struct.get_uint("stream-id")
            if parsed:
                #Set eos status of stream to True, to be deleted in delete-sources
                print("Got EOS from stream %d" % stream_id)
                g_eos_list[stream_id] = True
    return True

def main(args):
    global g_num_sources
    global g_source_bin_list
    # global uri

    global loop
    global pipeline
    global streammux
    global sink
    global pgie
    global sgie1
    global sgie2
    global sgie3
    global nvvideoconvert
    global nvosd
    global tiler
    global tracker

    g_source_bin_list.append("file:///opt/nvidia/deepstream/deepstream-5.1/sources/python/apps/deepstream-nvdsanalytics/Car_stream.h264")
    g_source_bin_list.append("file:///opt/nvidia/deepstream/deepstream-5.1/sources/python/apps/deepstream-nvdsanalytics/People_stream.h264")
    num_sources=len(g_source_bin_list)

    # Standard GStreamer initialization
    GObject.threads_init()
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    print("Creating streammux \n ")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    streammux.set_property("batched-push-timeout", 25000)
    streammux.set_property("batch-size", 30)
    streammux.set_property("gpu_id", GPU_ID)

    pipeline.add(streammux)
    streammux.set_property("live-source", 1)
    # uri = args[1]
    for i in range(num_sources):
        print("Creating source_bin ",i," \n ")
        uri_name=g_source_bin_list[i]
        if uri_name.find("rtsp://") == 0 :
            is_live = True
        #Create first source bin and add to pipeline
        source_bin=create_uridecode_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Failed to create source bin. Exiting. \n")
            sys.exit(1)
        g_source_bin_list[i] = source_bin
        pipeline.add(source_bin)

    g_num_sources = num_sources

    print("Creating Pgie \n ")
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")

    print("Creating nvtracker \n ")
    tracker = Gst.ElementFactory.make("nvtracker", "tracker")
    if not tracker:
        sys.stderr.write(" Unable to create tracker \n")

    print("Creating tiler \n ")
    tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        sys.stderr.write(" Unable to create tiler \n")

    print("Creating nvvidconv \n ")
    nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvideoconvert:
        sys.stderr.write(" Unable to create nvvidconv \n")

    print("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")

    if(is_aarch64()):
        print("Creating transform \n ")
        transform=Gst.ElementFactory.make("nvegltransform", "nvegl-transform")
        if not transform:
            sys.stderr.write(" Unable to create transform \n")

    sgie1 = Gst.ElementFactory.make("nvinfer", "secondary1-nvinference-engine")
    if not sgie1:
        sys.stderr.write(" Unable to make sgie1 \n")

    sgie2 = Gst.ElementFactory.make("nvinfer", "secondary2-nvinference-engine")
    if not sgie1:
        sys.stderr.write(" Unable to make sgie2 \n")

    sgie3 = Gst.ElementFactory.make("nvinfer", "secondary3-nvinference-engine")
    if not sgie3:
        sys.stderr.write(" Unable to make sgie3 \n")


    print("Creating EGLSink \n")
    sink = Gst.ElementFactory.make(SINK_ELEMENT, "nvvideo-renderer")
    if not sink:
        sys.stderr.write(" Unable to create egl sink \n")

    if is_live:
        print("Atleast one of the sources is live")
        streammux.set_property('live-source', 1)

    #Set streammux width and height
    streammux.set_property('width', MUXER_OUTPUT_WIDTH)
    streammux.set_property('height', MUXER_OUTPUT_HEIGHT)
    #Set pgie, sgie1, sgie2, and sgie3 configuration file paths
    pgie.set_property('config-file-path', PGIE_CONFIG_FILE)
    sgie1.set_property('config-file-path', SGIE1_CONFIG_FILE)
    sgie2.set_property('config-file-path', SGIE2_CONFIG_FILE)
    sgie3.set_property('config-file-path', SGIE3_CONFIG_FILE)

    #Set properties of tracker
    config = configparser.ConfigParser()
    config.read(TRACKER_CONFIG_FILE)
    config.sections()

    for key in config['tracker']:
        if key == 'tracker-width' :
            tracker_width = config.getint('tracker', key)
            tracker.set_property('tracker-width', tracker_width)
        if key == 'tracker-height' :
            tracker_height = config.getint('tracker', key)
            tracker.set_property('tracker-height', tracker_height)
        if key == 'gpu-id' :
            tracker_gpu_id = config.getint('tracker', key)
            tracker.set_property('gpu_id', tracker_gpu_id)
        if key == 'll-lib-file' :
            tracker_ll_lib_file = config.get('tracker', key)
            tracker.set_property('ll-lib-file', tracker_ll_lib_file)
        if key == 'll-config-file' :
            tracker_ll_config_file = config.get('tracker', key)
            tracker.set_property('ll-config-file', tracker_ll_config_file)
        if key == 'enable-batch-process' :
            tracker_enable_batch_process = config.getint('tracker', key)
            tracker.set_property('enable_batch_process', tracker_enable_batch_process)

    #Set necessary properties of the nvinfer element, the necessary ones are:
    pgie_batch_size=pgie.get_property("batch-size")
    if(pgie_batch_size < MAX_NUM_SOURCES):
        print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", num_sources," \n")
    pgie.set_property("batch-size",MAX_NUM_SOURCES)

    #Set gpu IDs of the inference engines
    pgie.set_property("gpu_id", GPU_ID)
    sgie1.set_property("gpu_id", GPU_ID)
    sgie2.set_property("gpu_id", GPU_ID)
    sgie3.set_property("gpu_id", GPU_ID)

    #Set tiler properties
    tiler_rows=int(math.sqrt(num_sources))
    tiler_columns=int(math.ceil((1.0*num_sources)/tiler_rows))
    tiler.set_property("rows",tiler_rows)
    tiler.set_property("columns",tiler_columns)
    tiler.set_property("width", TILED_OUTPUT_WIDTH)
    tiler.set_property("height", TILED_OUTPUT_HEIGHT)

    #Set gpu IDs of tiler, nvvideoconvert, and nvosd
    tiler.set_property("gpu_id", GPU_ID)
    nvvideoconvert.set_property("gpu_id", GPU_ID)
    nvosd.set_property("gpu_id", GPU_ID)

    #Set gpu ID of sink if not aarch64
    if(not is_aarch64()):
        sink.set_property("gpu_id", GPU_ID)

    print("Adding elements to Pipeline \n")
    pipeline.add(pgie)
    pipeline.add(tracker)
    pipeline.add(sgie1)
    pipeline.add(sgie2)
    pipeline.add(sgie3)
    pipeline.add(tiler)
    pipeline.add(nvvideoconvert)
    pipeline.add(nvosd)
    pipeline.add(sink)

    if is_aarch64():
        pipeline.add(transform)

    # We link elements in the following order:
    # sourcebin -> streammux -> nvinfer -> nvtracker -> nvdsanalytics ->
    # nvtiler -> nvvideoconvert -> nvdsosd -> (if aarch64, transform ->) sink
    print("Linking elements in the Pipeline \n")
    streammux.link(pgie)
    pgie.link(tracker)
    tracker.link(sgie1)
    sgie1.link(sgie2)
    sgie2.link(sgie3)
    sgie3.link(tiler)
    tiler.link(nvvideoconvert)
    nvvideoconvert.link(nvosd)
    if is_aarch64():
        nvosd.link(transform)
        transform.link(sink)
    else:
        nvosd.link(sink)

    sink.set_property("sync", 0)
    sink.set_property("qos",0)

    # create an event loop and feed gstreamer bus mesages to it
    loop = GObject.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)

    pipeline.set_state(Gst.State.PAUSED)

    # List the sources
    print("Now playing...")
    for i, source in enumerate(args):
        if (i != 0):
            print(i, ": ", source)

    print("Starting pipeline \n")
    # start play back and listed to events		
    pipeline.set_state(Gst.State.PLAYING)

    add_sources("file:///opt/nvidia/deepstream/deepstream-5.1/samples/streams/sample_720p.mp4")
    GObject.timeout_add_seconds(10, delete_sources, g_source_bin_list)

    try:
        loop.run()
    except:
        pass
    # cleanup
    print("Exiting app\n")
    pipeline.set_state(Gst.State.NULL)

if __name__ == '__main__':
    sys.exit(main(sys.argv))

參考資料