4. Video Tutorial Explanation#
1. Overview#
The K230 supports H264 or H265 encoding for video streams. The API provided in CanMV allows users to record and play MP4 files and also supports RTSP streaming.
2. Examples#
2.1 MP4 Recording#
This example program demonstrates the mp4 muxer functionality on the CanMV development board.
# Save MP4 file example
#
# Note: You will need an SD card to run this example.
#
# You can capture audio and video and save them as MP4. The current version only supports MP4 format, video supports 264/265, and audio supports g711a/g711u.
from media.mp4format import *
import os
def mp4_muxer_test():
print("mp4_muxer_test start")
width = 1280
height = 720
# Instantiate mp4 container
mp4_muxer = Mp4Container()
mp4_cfg = Mp4CfgStr(mp4_muxer.MP4_CONFIG_TYPE_MUXER)
if mp4_cfg.type == mp4_muxer.MP4_CONFIG_TYPE_MUXER:
file_name = "/sdcard/examples/test.mp4"
mp4_cfg.SetMuxerCfg(file_name, mp4_muxer.MP4_CODEC_ID_H265, width, height, mp4_muxer.MP4_CODEC_ID_G711U)
# Create mp4 muxer
mp4_muxer.Create(mp4_cfg)
# Start mp4 muxer
mp4_muxer.Start()
frame_count = 0
try:
while True:
os.exitpoint()
# Process audio and video data, write to file in MP4 format
mp4_muxer.Process()
frame_count += 1
print("frame_count = ", frame_count)
if frame_count >= 200:
break
except BaseException as e:
print(e)
# Stop mp4 muxer
mp4_muxer.Stop()
# Destroy mp4 muxer
mp4_muxer.Destroy()
print("mp4_muxer_test stop")
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
mp4_muxer_test()
Tip
For specific interface definitions, please refer to player
2.2 MP4 Playback#
This example program demonstrates the functionality of an mp4 file player on the CanMV development board.
# Play MP4 file example
#
# Note: You will need an SD card to run this example.
#
# You can load local files to play. The current version only supports MP4 format, video supports 264/265, and audio supports g711a/g711u.
from media.player import * # Import player module for playing mp4 files
import os
start_play = False # Playback end flag
def player_event(event, data):
global start_play
if event == K_PLAYER_EVENT_EOF: # Playback end marker
start_play = False # Set playback end marker
def play_mp4_test(filename):
global start_play
# player = Player() # Create player object
# Use IDE as output display, can set any resolution
player = Player(Display.VIRT)
# Use ST7701 LCD screen as output display, max resolution 800*480
# player = Player(Display.ST7701)
# Use HDMI as output display
# player = Player(Display.LT9611)
player.load(filename) # Load mp4 file
player.set_event_callback(player_event) # Set player event callback
player.start() # Start playback
start_play = True
# Wait for playback to end
try:
while start_play:
time.sleep(0.1)
os.exitpoint()
except KeyboardInterrupt as e:
print("user stop: ", e)
except BaseException as e:
sys.print_exception(e)
player.stop() # Stop playback
print("play over")
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
play_mp4_test("/sdcard/examples/test.mp4") # Play mp4 file
Tip
For specific interface definitions, please refer to player
2.3 H264/H265 Encoding#
This example program demonstrates the video encoding functionality on the CanMV development board.
# Video encode example
#
# Note: You will need an SD card to run this example.
#
# You can capture videos and encode them into 264/265 files
from media.vencoder import *
from media.sensor import *
from media.media import *
import time, os
def vi_bind_venc_test(file_name, width=1280, height=720):
print("venc_test start")
venc_chn = VENC_CHN_ID_0
width = ALIGN_UP(width, 16)
venc_payload_type = K_PT_H264
# Determine file type
suffix = file_name.split('.')[-1]
if suffix == '264':
venc_payload_type = K_PT_H264
elif suffix == '265':
venc_payload_type = K_PT_H265
else:
print("Unknown file extension")
return
# Initialize sensor
sensor = Sensor()
sensor.reset()
# Set camera output buffer
# set chn0 output size
sensor.set_framesize(width=width, height=height, alignment=12)
# set chn0 output format
sensor.set_pixformat(Sensor.YUV420SP)
# Instantiate video encoder
encoder = Encoder()
# Set video encoder output buffer
encoder.SetOutBufs(venc_chn, 8, width, height)
# Bind camera and venc
link = MediaManager.link(sensor.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, venc_chn))
# Initialize media manager
MediaManager.init()
if venc_payload_type == K_PT_H264:
chnAttr = ChnAttrStr(encoder.PAYLOAD_TYPE_H264, encoder.H264_PROFILE_MAIN, width, height)
elif venc_payload_type == K_PT_H265:
chnAttr = ChnAttrStr(encoder.PAYLOAD_TYPE_H265, encoder.H265_PROFILE_MAIN, width, height)
streamData = StreamData()
# Create encoder
encoder.Create(venc_chn, chnAttr)
# Start encoding
encoder.Start(venc_chn)
# Start camera
sensor.run()
frame_count = 0
print("save stream to file: ", file_name)
with open(file_name, "wb") as fo:
try:
while True:
os.exitpoint()
encoder.GetStream(venc_chn, streamData) # Get a frame stream
for pack_idx in range(0, streamData.pack_cnt):
stream_data = uctypes.bytearray_at(streamData.data[pack_idx], streamData.data_size[pack_idx])
fo.write(stream_data) # Stream write to file
print("stream size: ", streamData.data_size[pack_idx], "stream type: ", streamData.stream_type[pack_idx])
encoder.ReleaseStream(venc_chn, streamData) # Release a frame stream
frame_count += 1
if frame_count >= 200:
break
except KeyboardInterrupt as e:
print("user stop: ", e)
except BaseException as e:
import sys
sys.print_exception(e)
# Stop camera
sensor.stop()
# Destroy camera and venc binding
del link
# Stop encoding
encoder.Stop(venc_chn)
# Destroy encoder
encoder.Destroy(venc_chn)
# Clean buffer
MediaManager.deinit()
print("venc_test stop")
def stream_venc_test(file_name, width=1280, height=720):
print("venc_test start")
venc_chn = VENC_CHN_ID_0
width = ALIGN_UP(width, 16)
venc_payload_type = K_PT_H264
# Determine file type
suffix = file_name.split('.')[-1]
if suffix == '264':
venc_payload_type = K_PT_H264
elif suffix == '265':
venc_payload_type = K_PT_H265
else:
print("Unknown file extension")
return
# Initialize sensor
sensor = Sensor()
sensor.reset()
# Set camera output buffer
# set chn0 output size
sensor.set_framesize(width=width, height=height, alignment=12)
# set chn0 output format
sensor.set_pixformat(Sensor.YUV420SP)
# Instantiate video encoder
encoder = Encoder()
# Set video encoder output buffer
encoder.SetOutBufs(venc_chn, 8, width, height)
# Initialize media manager
MediaManager.init()
if venc_payload_type == K_PT_H264:
chnAttr = ChnAttrStr(encoder.PAYLOAD_TYPE_H264, encoder.H264_PROFILE_MAIN, width, height)
elif venc_payload_type == K_PT_H265:
chnAttr = ChnAttrStr(encoder.PAYLOAD_TYPE_H265, encoder.H265_PROFILE_MAIN, width, height)
streamData = StreamData()
# Create encoder
encoder.Create(venc_chn, chnAttr)
# Start encoding
encoder.Start(venc_chn)
# Start camera
sensor.run()
frame_count = 0
print("save stream to file: ", file_name)
yuv420sp_img = None
frame_info = k_video_frame_info()
with open(file_name, "wb") as fo:
try:
while True:
os.exitpoint()
yuv420sp_img = sensor.snapshot(chn=CAM_CHN_ID_0)
if yuv420sp_img == -1:
continue
frame_info.v_frame.width = yuv420sp_img.width()
frame_info.v_frame.height = yuv420sp_img.height()
frame_info.v_frame.pixel_format = Sensor.YUV420SP
frame_info.pool_id = yuv420sp_img.poolid()
frame_info.v_frame.phys_addr[0] = yuv420sp_img.phyaddr()
if yuv420sp_img.width() == 800 and yuv420sp_img.height() == 480:
frame_info.v_frame.phys_addr[1] = frame_info.v_frame.phys_addr[0] + frame_info.v_frame.width * frame_info.v_frame.height + 1024
elif yuv420sp_img.width() == 1920 and yuv420sp_img.height() == 1080:
frame_info.v_frame.phys_addr[1] = frame_info.v_frame.phys_addr[0] + frame_info.v_frame.width * frame_info.v_frame.height + 3072
elif yuv420sp_img.width() == 640 and yuv420sp_img.height() == 360:
frame_info.v_frame.phys_addr[1] = frame_info.v_frame.phys_addr[0] + frame_info.v_frame.width * frame_info.v_frame.height + 3072
else:
frame_info.v_frame.phys_addr[1] = frame_info.v_frame.phys_addr[0] + frame_info.v_frame.width * frame_info.v_frame.height
encoder.SendFrame(venc_chn, frame_info)
encoder.GetStream(venc_chn, streamData) # Get a frame stream
for pack_idx in range(0, streamData.pack_cnt):
stream_data = uctypes.bytearray_at(streamData.data[pack_idx], streamData.data_size[pack_idx])
fo.write(stream_data) # Stream write to file
print("stream size: ", streamData.data_size[pack_idx], "stream type: ", streamData.stream_type[pack_idx])
encoder.ReleaseStream(venc_chn, streamData) # Release a frame stream
frame_count += 1
if frame_count >= 200:
break
except KeyboardInterrupt as e:
print("user stop: ", e)
except BaseException as e:
import sys
sys.print_exception(e)
# Stop camera
sensor.stop()
# Stop encoding
encoder.Stop(venc_chn)
# Destroy encoder
encoder.Destroy(venc_chn)
# Clean buffer
MediaManager.deinit()
print("venc_test stop")
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
vi_bind_venc_test("/sdcard/examples/test.264", 800, 480) # vi bind venc example
# stream_venc_test("/sdcard/examples/test.264", 800, 480) # venc encode data stream example
Tip
For specific interface definitions, please refer to VENC
2.4 H264/H265 Decoding#
This example program demonstrates the video decoding functionality on the CanMV development board.
# Video encode example
#
# Note: You will need an SD card to run this example.
# (The rest of the code for decoding would follow a similar pattern to the encoding example.)
# You can decode 264/265 and display them on the screen
from media.media import *
from mpp.payload_struct import*
import media.vdecoder as vdecoder
from media.display import *
import time, os
STREAM_SIZE = 40960
def vdec_test(file_name, width=1280, height=720):
print("vdec_test start")
vdec_chn = VENC_CHN_ID_0
vdec_width = ALIGN_UP(width, 16)
vdec_height = height
vdec = None
vdec_payload_type = K_PT_H264
# display_type = Display.VIRT
display_type = Display.ST7701 # Use ST7701 LCD screen as output display, max resolution 800*480
# display_type = Display.LT9611 # Use HDMI as output display
# Determine file type
suffix = file_name.split('.')[-1]
if suffix == '264':
vdec_payload_type = K_PT_H264
elif suffix == '265':
vdec_payload_type = K_PT_H265
else:
print("Unknown file extension")
return
# Instantiate video decoder
# vdecoder.Decoder.vb_pool_config(4, 6)
vdec = vdecoder.Decoder(vdec_payload_type)
# Initialize display
if display_type == Display.VIRT:
Display.init(display_type, width=vdec_width, height=vdec_height, fps=30)
else:
Display.init(display_type, to_ide=True)
# vb buffer initialization
MediaManager.init()
# Create video decoder
vdec.create()
# vdec bind display
bind_info = vdec.bind_info(width=vdec_width, height=vdec_height, chn=vdec.get_vdec_channel())
Display.bind_layer(**bind_info, layer=Display.LAYER_VIDEO1)
vdec.start()
# Open file
with open(file_name, "rb") as fi:
while True:
os.exitpoint()
# Read video data stream
data = fi.read(STREAM_SIZE)
if not data:
break
# Decode data stream
vdec.decode(data)
# Stop video decoder
vdec.stop()
# Destroy video decoder
vdec.destroy()
time.sleep(1)
# Close display
Display.deinit()
# Release vb buffer
MediaManager.deinit()
print("vdec_test stop")
if **name** == "**main**":
os.exitpoint(os.EXITPOINT_ENABLE)
vdec_test("/sdcard/examples/test.264", 800, 480) # Decode 264/265 video file
```{admonition} Tip
For specific interface definitions, please refer to [VDEC](../../api/mpp/K230_CanMV_VDEC_Module_API_Manual.md)
2.5 RTSP Streaming#
This example demonstrates how to use the RTSP server to stream video and audio to the network.
# Description: This example demonstrates how to stream video and audio to the network using the RTSP server.
#
# Note: You will need an SD card to run this example.
#
# You can run the RTSP server to stream video and audio to the network
from media.vencoder import *
from media.sensor import *
from media.media import *
import time, os
import _thread
import multimedia as mm
from time import *
class RtspServer:
def __init__(self, session_name="test", port=8554, video_type=mm.multi_media_type.media_h264, enable_audio=False):
self.session_name = session_name # session name
self.video_type = video_type # Video type 264/265
self.enable_audio = enable_audio # Enable audio
self.port = port # RTSP port number
self.rtspserver = mm.rtsp_server() # Instantiate RTSP server
self.venc_chn = VENC_CHN_ID_0 # VENC channel
self.start_stream = False # Whether to start streaming thread
self.runthread_over = False # Whether streaming thread is over
def start(self):
# Initialize streaming
self._init_stream()
self.rtspserver.rtspserver_init(self.port)
# Create session
self.rtspserver.rtspserver_createsession(self.session_name, self.video_type, self.enable_audio)
# Start RTSP server
self.rtspserver.rtspserver_start()
self._start_stream()
# Start streaming thread
self.start_stream = True
_thread.start_new_thread(self._do_rtsp_stream, ())
def stop(self):
if not self.start_stream:
return
# Wait for streaming thread to exit
self.start_stream = False
while not self.runthread_over:
sleep(0.1)
self.runthread_over = False
# Stop streaming
self._stop_stream()
self.rtspserver.rtspserver_stop()
# self.rtspserver.rtspserver_destroysession(self.session_name)
self.rtspserver.rtspserver_deinit()
def get_rtsp_url(self):
return self.rtspserver.rtspserver_getrtspurl(self.session_name)
def _init_stream(self):
width = 1280
height = 720
width = ALIGN_UP(width, 16)
# Initialize sensor
self.sensor = Sensor()
self.sensor.reset()
self.sensor.set_framesize(width=width, height=height, alignment=12)
self.sensor.set_pixformat(Sensor.YUV420SP)
# Instantiate video encoder
self.encoder = Encoder()
self.encoder.SetOutBufs(self.venc_chn, 8, width, height)
# Bind camera and VENC
self.link = MediaManager.link(self.sensor.bind_info()['src'], (VIDEO_ENCODE_MOD_ID, VENC_DEV_ID, self.venc_chn))
# Init media manager
MediaManager.init()
# Create encoder
chnAttr = ChnAttrStr(self.encoder.PAYLOAD_TYPE_H264, self.encoder.H264_PROFILE_MAIN, width, height)
self.encoder.Create(self.venc_chn, chnAttr)
def _start_stream(self):
# Start encoding
self.encoder.Start(self.venc_chn)
# Start camera
self.sensor.run()
def _stop_stream(self):
# Stop camera
self.sensor.stop()
# Unbind camera and VENC
del self.link
# Stop encoding
self.encoder.Stop(self.venc_chn)
self.encoder.Destroy(self.venc_chn)
# Clean buffer
MediaManager.deinit()
def _do_rtsp_stream(self):
try:
streamData = StreamData()
while self.start_stream:
os.exitpoint()
# Get a frame stream
self.encoder.GetStream(self.venc_chn, streamData)
# Stream
for pack_idx in range(streamData.pack_cnt):
stream_data = bytes(uctypes.bytearray_at(streamData.data[pack_idx], streamData.data_size[pack_idx]))
self.rtspserver.rtspserver_sendvideodata(self.session_name, stream_data, streamData.data_size[pack_idx], 1000)
# print("stream size: ", streamData.data_size[pack_idx], "stream type: ", streamData.stream_type[pack_idx])
# Release a frame stream
self.encoder.ReleaseStream(self.venc_chn, streamData)
except BaseException as e:
print(f"Exception {e}")
finally:
self.runthread_over = True
# Stop RTSP server
self.stop()
self.runthread_over = True
if __name__ == "__main__":
os.exitpoint(os.EXITPOINT_ENABLE)
# Create RTSP server object
rtspserver = RtspServer()
# Start RTSP server
rtspserver.start()
# Print RTSP URL
print("RTSP server start:", rtspserver.get_rtsp_url())
# Stream for 60s
sleep(60)
# Stop RTSP server
rtspserver.stop()
print("done")
Tip
For specific interface definitions, please refer to RTSP