K230: pipeline not working in a function

I am trying to launch the object detection demo of the K230 inside a function.
The demo works correctly out of the box, but when I put it inside a function it doesn’t work anymore.

In the main function, I use the display to select the demo I want to execute, using the touch screen.
So I initiate the display, the mediamanager, create an image and wait for the touch to select the demo I want to execute. When the screen is touched, I deinit the display and the mediamanager, then I call the function containing the demo (object detection)

The demo is basically unchanged.

    print('object detection')
    display_mode="lcd2_4"
    display_size=[640,480]
    rgb888p_size=[320,320]
    pl=PipeLine(rgb888p_size=rgb888p_size,display_size=display_size,display_mode=display_mode)
    print(1)
    pl.create(Sensor(width=640, height=480))  # Créer une instance Pipeline
    print(2)

But the execution seems stuck at the pl.create, and ‘2’ is never printed.
Can anyone help me?

try:

pl.create(sensor=Sensor(width=640, height=480))

Thansk for your answer. It’s the same, I get:

1
find sensor gc2093_csi2, type 31, output 1280x960@90

Here is the entire code:

# Untitled - By: FA125436 - Wed Oct 1 2025

import time, os, image
from media.display import *
from media.sensor  import *
from media.media   import *
from machine import TOUCH
from machine import FPIOA,Pin

# General parameters
DISPLAY_WIDTH  = 640
DISPLAY_HEIGHT = 480
display_mode="lcd2_4"
display_size=[640,480]
rgb888p_size=[320,320]

# Button
fpioa = FPIOA()
fpioa.set_function(21,FPIOA.GPIO21)
KEY=Pin(21,Pin.IN,Pin.PULL_UP)
# LED
fpioa.set_function(52,FPIOA.GPIO52)
LED=Pin(52,Pin.OUT) # Construct LED object, GPIO52, output

"""
# Camera init
sensor = Sensor(id=2, width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT)
sensor.reset()                      # Reset and initialize the camera
sensor.set_framesize(Sensor.VGA)    # width=640, height=480, 2.4-inch, default channel 0
sensor.set_pixformat(Sensor.RGB565) # Set the output image format, default channel 0
"""
# Déclarer le touch screen
tp = TOUCH(0)


######################################################


def Object_detection():
    from libs.PipeLine import PipeLine, ScopedTiming
    from libs.AIBase import AIBase
    from libs.AI2D import Ai2d
    import time
    import nncase_runtime as nn
    import ulab.numpy as np
    import time
    import gc

    # Classe de détection YOLOv8 personnalisée
    class ObjectDetectionApp(AIBase):
        def __init__(self,kmodel_path,labels,model_input_size,max_boxes_num,confidence_threshold=0.5,nms_threshold=0.2,rgb888p_size=[224,224],display_size=[1920,1080],debug_mode=0):
            super().__init__(kmodel_path,model_input_size,rgb888p_size,debug_mode)
            self.kmodel_path=kmodel_path
            self.labels=labels
            # Résolution d'entrée du modèle
            self.model_input_size=model_input_size
            # Réglage du seuil
            self.confidence_threshold=confidence_threshold
            self.nms_threshold=nms_threshold
            self.max_boxes_num=max_boxes_num
            # Résolution d'image fournie par le capteur à l'IA
            self.rgb888p_size=[ALIGN_UP(rgb888p_size[0],16),rgb888p_size[1]]
            # Résolution d'affichage
            self.display_size=[ALIGN_UP(display_size[0],16),display_size[1]]
            self.debug_mode=debug_mode
            # Valeurs de couleur prédéfinies pour la boîte d'inspection
            self.color_four=[(255, 220, 20, 60), (255, 119, 11, 32), (255, 0, 0, 142), (255, 0, 0, 230),
                             (255, 106, 0, 228), (255, 0, 60, 100), (255, 0, 80, 100), (255, 0, 0, 70),
                             (255, 0, 0, 192), (255, 250, 170, 30), (255, 100, 170, 30), (255, 220, 220, 0),
                             (255, 175, 116, 175), (255, 250, 0, 30), (255, 165, 42, 42), (255, 255, 77, 255),
                             (255, 0, 226, 252), (255, 182, 182, 255), (255, 0, 82, 0), (255, 120, 166, 157)]
            # Rapport entre la largeur et la hauteur
            self.x_factor = float(self.rgb888p_size[0])/self.model_input_size[0]
            self.y_factor = float(self.rgb888p_size[1])/self.model_input_size[1]
            # Instance Ai2d, utilisée pour implémenter le prétraitement du modèle
            self.ai2d=Ai2d(debug_mode)
            # Configurer les formats et types d'entrée et de sortie pour Ai2d
            self.ai2d.set_ai2d_dtype(nn.ai2d_format.NCHW_FMT,nn.ai2d_format.NCHW_FMT,np.uint8, np.uint8)

        # Configurez les opérations de prétraitement ; ici, la fonction resize est utilisée
        # Ai2d prend en charge les fonctions crop/shift/pad/resize/affine. Voir /sdcard/libs/AI2D.py
        def config_preprocess(self,input_image_size=None):
            with ScopedTiming("set preprocess config",self.debug_mode > 0):
                # Initialisez la configuration de prétraitement ai2d.
                # Par défaut, celle-ci adopte les dimensions fournies par le capteur à l'IA.
                # Vous pouvez modifier manuellement la taille d'entrée en configurant le paramètre input_image_size.
                ai2d_input_size=input_image_size if input_image_size else self.rgb888p_size
                self.ai2d.resize(nn.interp_method.tf_bilinear, nn.interp_mode.half_pixel)
                self.ai2d.build([1,3,ai2d_input_size[1],ai2d_input_size[0]],[1,3,self.model_input_size[1],self.model_input_size[0]])

        # Personnalisez le post-traitement pour la tâche en cours
        def postprocess(self,results):
            with ScopedTiming("postprocess",self.debug_mode > 0):
                result=results[0]
                result = result.reshape((result.shape[0] * result.shape[1], result.shape[2]))
                output_data = result.transpose()
                boxes_ori = output_data[:,0:4]
                scores_ori = output_data[:,4:]
                confs_ori = np.max(scores_ori,axis=-1)
                inds_ori = np.argmax(scores_ori,axis=-1)
                boxes,scores,inds = [],[],[]
                for i in range(len(boxes_ori)):
                    if confs_ori[i] > confidence_threshold:
                        scores.append(confs_ori[i])
                        inds.append(inds_ori[i])
                        x = boxes_ori[i,0]
                        y = boxes_ori[i,1]
                        w = boxes_ori[i,2]
                        h = boxes_ori[i,3]
                        left = int((x - 0.5 * w) * self.x_factor)
                        top = int((y - 0.5 * h) * self.y_factor)
                        right = int((x + 0.5 * w) * self.x_factor)
                        bottom = int((y + 0.5 * h) * self.y_factor)
                        boxes.append([left,top,right,bottom])
                if len(boxes)==0:
                    return []
                boxes = np.array(boxes)
                scores = np.array(scores)
                inds = np.array(inds)
                # Processus NMS
                keep = self.nms(boxes,scores,nms_threshold)
                dets = np.concatenate((boxes, scores.reshape((len(boxes),1)), inds.reshape((len(boxes),1))), axis=1)
                dets_out = []
                for keep_i in keep:
                    dets_out.append(dets[keep_i])
                dets_out = np.array(dets_out)
                dets_out = dets_out[:self.max_boxes_num, :]
                return dets_out

        # Résultats du graphique
        def draw_result(self,pl,dets):
            with ScopedTiming("display_draw",self.debug_mode >0):
                if dets:
                    pl.osd_img.clear()
                    for det in dets:
                        x1, y1, x2, y2 = map(lambda x: int(round(x, 0)), det[:4])
                        x= x1*self.display_size[0] // self.rgb888p_size[0]
                        y= y1*self.display_size[1] // self.rgb888p_size[1]
                        w = (x2 - x1) * self.display_size[0] // self.rgb888p_size[0]
                        h = (y2 - y1) * self.display_size[1] // self.rgb888p_size[1]
                        pl.osd_img.draw_rectangle(x,y, w, h, color=self.get_color(int(det[5])),thickness=4)
                        pl.osd_img.draw_string_advanced( x , y-50,32," " + self.labels[int(det[5])] + " " + str(round(det[4],2)) , color=self.get_color(int(det[5])))
                else:
                    pl.osd_img.clear()


        # Détection multi-objets : mise en œuvre de la méthode de suppression non maximale
        def nms(self,boxes,scores,thresh):
            """Pure Python NMS baseline."""
            x1,y1,x2,y2 = boxes[:, 0],boxes[:, 1],boxes[:, 2],boxes[:, 3]
            areas = (x2 - x1 + 1) * (y2 - y1 + 1)
            order = np.argsort(scores,axis = 0)[::-1]
            keep = []
            while order.size > 0:
                i = order[0]
                keep.append(i)
                new_x1,new_y1,new_x2,new_y2,new_areas = [],[],[],[],[]
                for order_i in order:
                    new_x1.append(x1[order_i])
                    new_x2.append(x2[order_i])
                    new_y1.append(y1[order_i])
                    new_y2.append(y2[order_i])
                    new_areas.append(areas[order_i])
                new_x1 = np.array(new_x1)
                new_x2 = np.array(new_x2)
                new_y1 = np.array(new_y1)
                new_y2 = np.array(new_y2)
                xx1 = np.maximum(x1[i], new_x1)
                yy1 = np.maximum(y1[i], new_y1)
                xx2 = np.minimum(x2[i], new_x2)
                yy2 = np.minimum(y2[i], new_y2)
                w = np.maximum(0.0, xx2 - xx1 + 1)
                h = np.maximum(0.0, yy2 - yy1 + 1)
                inter = w * h
                new_areas = np.array(new_areas)
                ovr = inter / (areas[i] + new_areas - inter)
                new_order = []
                for ovr_i,ind in enumerate(ovr):
                    if ind < thresh:
                        new_order.append(order[ovr_i])
                order = np.array(new_order,dtype=np.uint8)
            return keep

        # Obtenir la couleur de la case en fonction de l'index de catégorie actuel
        def get_color(self, x):
            idx=x%len(self.color_four)
            return self.color_four[idx]

    print('object detection')
    display_mode="lcd2_4"
    display_size=[640,480]
    rgb888p_size=[320,320]
    pl=PipeLine(rgb888p_size=rgb888p_size,display_size=display_size,display_mode=display_mode)
    print(1)
    pl.create(sensor=Sensor(width=640, height=480))  # Créer une instance Pipeline
    print(2) # <----  Never printed

    # Chemin du modèle
    kmodel_path="/sdcard/examples/kmodel/yolov8n_320.kmodel"
    labels = ["person", "bicycle", "car", "motorcycle", "airplane", "bus", "train",
              "truck", "boat", "traffic light", "fire hydrant", "stop sign", "parking meter",
              "bench", "bird", "cat", "dog", "horse", "sheep", "cow", "elephant", "bear",
              "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase",
              "frisbee", "skis", "snowboard", "sports ball", "kite", "baseball bat",
              "baseball glove", "skateboard", "surfboard", "tennis racket", "bottle",
              "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
              "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut",
              "cake", "chair", "couch", "potted plant", "bed", "dining table", "toilet",
              "tv", "laptop", "mouse", "remote", "keyboard", "cell phone", "microwave",
              "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors",
              "teddy bear", "hair drier", "toothbrush"]
    # Autres réglages des paramètres
    confidence_threshold = 0.2
    nms_threshold = 0.2
    max_boxes_num = 20

   # Initialiser une instance de détection d'objet personnalisée
    print('instance')
    ob_det=ObjectDetectionApp(kmodel_path,labels=labels,model_input_size=[320,320],
                              max_boxes_num=max_boxes_num,confidence_threshold=confidence_threshold,
                              nms_threshold=nms_threshold,rgb888p_size=rgb888p_size,
                              display_size=display_size,debug_mode=0)
    ob_det.config_preprocess()

    clock = time.clock()
    print('check button')
    while KEY.value()!=0:
        clock.tick()
        print('get frame')
        img=pl.get_frame()  # Récupérer les données de l'image actuelle
        res=ob_det.run(img) # Déduire l'image actuelle
        ob_det.draw_result(pl,res) # Tracer les résultats sur l'image OSD de PipeLine
        # print(res)  # Imprimer les résultats actuels
        pl.show_image() # Afficher le résultat actuel du rendu
        gc.collect()
        # print(f'FPS= {clock.fps()} images per second') # Fréquence d'impression

    pl.destroy()


def printList(img, files):
    y = 0
    img.clear()
    print(files)
    for f in files:
        print(f, y)
        img.draw_string_advanced(x, y, size, f, color=(0, 255, 255))
        y += dy
    Display.show_image(img)


#######################################################

# Main code
os.exitpoint(os.EXITPOINT_ENABLE)
# Available functions
files = ["Object_detection", "Face detection", "Exit"]
isInitDisplay = False

x = 0
size = 35
dy   = 50
radius = 4
exit = False

# Write list of files on LCD
try:
    while True:
        if not isInitDisplay:
            print('init display and mediamanager')
            Display.init(Display.ST7701,width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT, to_ide=True)
            MediaManager.init()
            isInitDisplay = True
            # Prepare LCD display
            img = image.Image(DISPLAY_WIDTH, DISPLAY_HEIGHT, image.RGB565)
            printList(img, files)
        # Check touch
        time.sleep_ms(100)
        p = tp.read(1)
        if len(p) != 0:
            if p[0].event != TOUCH.EVENT_NONE: # Touch !
                n = int(p[0].y / dy)
                if n < len(files):
                    module = files[n]
                    y = n * dy
                    img.draw_circle(p[0].x, p[0].y, radius, color=(0, 255, 0), fill=True)
                    img.draw_string_advanced(x, y, size, module, color=(255, 0, 0))
                    Display.show_image(img)
                    time.sleep(1)
                    print(f'Module: {module}')
                    if module == "Exit":
                        exit = True
                    else: # execute the module
                        isInitDisplay = False
                        Display.deinit()
                        time.sleep_ms(100)
                        MediaManager.deinit()
                        # Initialiser le pipeline
                        time.sleep_ms(100)
                        print('calling function')
                        globals()[module]()
                        print(f'End of {module}\n')
        if exit:
            break
except KeyboardInterrupt as e:
    print(" User stopped:", e)
except BaseException as e:
    print(f" Exception: {e}")

# Clean exit
if isInitDisplay:
    Display.deinit()
    os.exitpoint(os.EXITPOINT_ENABLE_SLEEP)
    time.sleep_ms(100)
    MediaManager.deinit()

Can you try it also?