CanMV K230 WIFI图传

使用Socket进行局域网WIFI图传,根据官方代码修改而来(毫无技术含量,图个乐)
有关Socket的基础知识请看官方文档: Socket通讯 | 01Studio
代码讲解:
1.连接WIFI(直接照抄官方代码即可):

WIFI_LED=Pin(52, Pin.OUT) #初始化WIFI指示灯

wlan = network.WLAN(network.STA_IF) #STA模式
wlan.active(True)                   #激活接口

if not wlan.isconnected():

    print('开始连接WIFI...')

    #输入WIFI账号密码(仅支持2.4G信号), 连接超过10秒为超时
    wlan.connect('B2-406', '36903092')

if wlan.isconnected(): #连接成功

    #LED蓝灯点亮
    WIFI_LED.value(1)

    HOST = wlan.ifconfig()[0]
    PORT = 8080

    #串口打印信息
    print(f'连接成功,IP:{HOST}')

这里创建完WIFI接口对象后,不要直接连接WiFi,因为如果WiFi已经在上一次程序连接成功WIFI,即使程序结束后(没有重启开发板)WiFi也会是连接状态,再次连接WiFi很有可能引发异常,所以需要先检测WiFi是否是未连接的状态再去连接WiFi
2.创建套接字为服务端,准备向浏览器(客户端)发送消息(图片):

  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  addr=socket.getaddrinfo(HOST,PORT)[0][-1]
  s.bind(addr)
  s.listen(5)
  print(f"服务器启动: {HOST}:{PORT}...")

  client, addr = s.accept()
  print(f'成功连接: {addr[0]}:{addr[1]}')

  client.send("HTTP/1.1 200 OK\r\n"
              "Server: Tao\r\n"
              "Content-Type: multipart/x-mixed-replace;boundary=Tao\r\n"
              "Cache-Control: no-cache\r\n"
              "Pragma: no-cache\r\n\r\n".encode())

这里我们使用这个代码把套接字设置为了服务器模式:

    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

表示将要向客户端发送数据并且等待客户端(浏览器连接)注意,bind()函数不可以直接绑定一个元组:(“0.0.0.0”,8000),必须转化才能绑定(注意地址只能被一个程序绑定)绑定完成后浏览器直接输入IP和端口号即可连接:

addr=socket.getaddrinfo(HOST,PORT)[0][-1]

当我们接收到浏览器的连接,将会向浏览器发送以下数据:

client.send("HTTP/1.1 200 OK\r\n"
                "Server: Tao\r\n"
                "Content-Type: multipart/x-mixed-replace;boundary=Tao\r\n"
                "Cache-Control: no-cache\r\n"
                "Pragma: no-cache\r\n\r\n".encode())

其中"HTTP/1.1 200 OK"是固定的响应头,表示告诉浏览器连接成功"Content-Type: multipart/x-mixed-replace;boundary=Tao"这一行表示即将向客户端持续发送数据,提示浏览器不要断开链接,最后使用.encode()转换为二进制,这是因为再网络传输的时候,只能传输二进制.
3.初始化摄像头:

        sensor = Sensor() #构建摄像头对象
        sensor.reset() #复位和初始化摄像头
        sensor.set_framesize(Sensor.VGA)
        sensor.set_pixformat(Sensor.RGB565) #设置输出图像格式,默认通道0

        #使用IDE缓冲区输出图像
        Display.init(Display.VIRT)

        MediaManager.init() #初始化media资源管理器

        sensor.run() #启动sensor

注意一定要使用RGB888通道的的图片,否则浏览器无法解析(或者颜色通道错误)
4.循环发送图片:

while True:
    os.exitpoint() #检测IDE中断
    img = sensor.snapshot() #拍摄一张图
    Display.show_image(img) #显示图片
    imge = img.to_jpeg()
    header = "--Tao\r\n" \
    "Content-Type: image/jpeg\r\n" \
    f"Content-Length: {len(imge)}\r\n\r\n"
    client.send(header.encode())
    client.send(imge)

这里注意,显示图片可以不用压缩为jpeg图像,但是在传输的时候一定只能传输压缩后的JPEG图片否则浏览器将无法识别,到此,程序已经结束了.接下来是完整代码:

import time, os, sys
from media.sensor import * #导入sensor模块,使用摄像头相关接口
from media.display import * #导入display模块,使用display相关接口
from media.media import * #导入media模块,使用meida相关接口
import network,time,socket
from machine import Pin

WIFI_LED=Pin(52, Pin.OUT) #初始化WIFI指示灯

wlan = network.WLAN(network.STA_IF) #STA模式
wlan.active(True)                   #激活接口

if not wlan.isconnected():

    print('开始连接WIFI...')

    #输入WIFI账号密码(仅支持2.4G信号), 连接超过10秒为超时
    wlan.connect('B2-406', '36903092')

if wlan.isconnected(): #连接成功

    #LED蓝灯点亮
    WIFI_LED.value(1)

    HOST = wlan.ifconfig()[0]
    PORT = 8080

    #串口打印信息
    print(f'连接成功,IP:{HOST}')


    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    addr=socket.getaddrinfo(HOST,PORT)[0][-1]
    s.bind(addr)
    s.listen(5)
    print(f"服务器启动: {HOST}:{PORT}...")

    client, addr = s.accept()
    print(f'成功连接: {addr[0]}:{addr[1]}')

    client.send("HTTP/1.1 200 OK\r\n"
                "Server: Tao\r\n"
                "Content-Type: multipart/x-mixed-replace;boundary=Tao\r\n"
                "Cache-Control: no-cache\r\n"
                "Pragma: no-cache\r\n\r\n".encode())

    try:

        sensor = Sensor() #构建摄像头对象
        sensor.reset() #复位和初始化摄像头
        sensor.set_framesize(Sensor.VGA)
        sensor.set_pixformat(Sensor.RGB565) #设置输出图像格式,默认通道0

        #使用IDE缓冲区输出图像
        Display.init(Display.VIRT)

        MediaManager.init() #初始化media资源管理器

        sensor.run() #启动sensor

        clock = time.clock()

        while True:


            os.exitpoint() #检测IDE中断

            clock.tick()

            img = sensor.snapshot() #拍摄一张图
            Display.show_image(img) #显示图片
            imge = img.to_jpeg()
            header = "--Tao\r\n" \
             "Content-Type: image/jpeg\r\n" \
             f"Content-Length: {len(imge)}\r\n\r\n"

            client.send(header.encode())
            client.send(imge)

            print(clock.fps()) #打印FPS


    ###################
    # IDE中断释放资源代码
    ###################
    except KeyboardInterrupt as e:
        print("user stop: ", e)
    except BaseException as e:
        print(f"Exception {e}")
    finally:
        # sensor stop run
        if isinstance(sensor, Sensor):
            sensor.stop()
        # deinit display
        Display.deinit()
        os.exitpoint(os.EXITPOINT_ENABLE_SLEEP)
        time.sleep_ms(100)
        # release media buffer
        MediaManager.deinit()


else: #连接失败

    #LED闪烁3次提示
    for i in range(3):
        WIFI_LED.value(1)
        time.sleep_ms(300)
        WIFI_LED.value(0)
        time.sleep_ms(300)

    wlan.active(False)
    print('连接失败,尝试重启!')

已知bug:长时间图传会导致程序崩溃,并且再次运行代码会引发异常,需要重启才可以继续运行:

发个图片看看实际效果。一图胜千言。

效果实测10fps上下:
PixPin_2024-08-11_15-26-27

不错,谢谢啦,

感谢分享!