#!/usr/bin/env python # -*- coding: utf-8 -*- # # appCam.py # based on tutorial ==> https://blog.miguelgrinberg.com/post/video-streaming-with-flask # PiCam Local Web Server with Flask # MJRoBot.org 19Jan18 from flask import Flask, render_template, Response, redirect, url_for, send_file, jsonify, request from PIL import Image import base64 # Raspberry Pi camera module (requires picamera package) #from camera_pi import Camera from picamera2 import Picamera2 import os import time from gevent import pywsgi from car import CAR #from PiCamera_H264_Server import stream import threading import cv2 import requests app = Flask(__name__, static_url_path='') # 照片保存路径 PHOTO_PATH = "photo.jpg" picam2 = None # picam2 = Picamera2() # picam2.configure(picam2.create_video_configuration(main={"format": 'XRGB8888', "size": (640, 480)})) # picam2.start() def cameraon(): global picam2 picam2 = Picamera2() picam2.configure(picam2.create_video_configuration(main={"format": 'XRGB8888', "size": (640, 480)})) picam2.start() def cameradown(): global picam2 picam2.stop() picam2.close() def gen_frames(): # generate frame by frame from camera global picam2 cameraon() while True: # Capture frame-by-frame frame = picam2.capture_array() # read the camera frame frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) ret, buffer = cv2.imencode('.jpg', frame) frame = buffer.tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') # concat frame one by one and show result # def capture_frame(): # picam2.close() # frame = picam2.capture_array() # 假设picam2是你的相机对象 # ret, buffer = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50]) # img_str = base64.b64encode(buffer).decode() # picam2.start() # return img_str # @app.route('/capture_frame') # def capture_frame_route(): # img_str = capture_frame() # return jsonify(img_str) @app.route('/control/sendit', methods=['POST']) def send_command(): # 定义目标 IP 地址、端口和路由 ip = "192.168.60.204" port = 3000 route = "sendit2" # 从请求中获取消息内容 data = request.json message = data.get('message') if message is None: return jsonify({"error": "缺少 message 参数"}), 400 # 构建完整的 URL url = f"http://{ip}:{port}/{route}" # 发送 POST 请求 try: response = requests.post(url, json={'message': message}) response.raise_for_status() # 检查请求是否成功 return f"命令已发送,服务器响应: {response.text}" except requests.exceptions.RequestException as e: return f"请求失败: {str(e)}", 500 @app.route('/capture') def capture_image(): global picam2 # 获取一帧 frame = picam2.capture_array() # 读取当前帧 frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # 定义保存图片的路径 photo_path = "captured_photo.jpg" # 将帧保存为图片 try: cv2.imwrite(photo_path, frame) return f"Photo saved at {photo_path}" except Exception as e: return f"Failed to save photo: {str(e)}" @app.route('/') def index(): return render_template('index-t.html') # @app.route('/video_feed') # def video_feed(): # #Video streaming route. Put this in the src attribute of an img tag # return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/startvadio') def startvadio(): return Response(gen_frames(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/stopvadio') def stopvadio(): cameradown() return "closed" # def gen(camera): # """Video streaming generator function.""" # while True: # frame = camera.get_frame() # yield (b'--frame\r\n' # b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') # @app.route('/capture') # def capture(): # pic = open("qrcode.png", "wb") # frame = Camera().get_frame() # pic.write(frame) # return Response(b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n', # mimetype='multipart/x-mixed-replace; boundary=frame') # #return send_file("qrcode.png", mimetype='image/png') # #return redirect(url_for('index')) # @app.route('/video_feed') # def video_feed(): # """Video streaming route. Put this in the src attribute of an img tag.""" # return Response(gen(Camera()), # mimetype='multipart/x-mixed-replace; boundary=frame') car = CAR() @app.route('/control/') def control_index(): word = """指令:\n /stop(/Q): 小车停止运动\n /forward(/W): 小车开始运动\n /back(/S): 小车向后运动\n /left(/A): 小车向左运动\n /right(/D): 小车向右运动\n""" print(word) return word # def execute_forward_function(): # url = "http://192.168.185.242:80/send_command" # 示例URL,实际使用时需要替换为正确的服务器地址 # data = {"command": 'base -c {"T":1,"L":0.5,"R":0.5}'} # 请求体数据 # # 发送请求并打印返回结果 # try: # response = request.post(url, data=data) # print(response.text) # 打印服务器返回的内容 # except request.exceptions.RequestException as e: # print(f"请求发生错误: {e}") # print("执行前进功能") # # 返回一些结果 # return "前进功能已执行" # @app.route('/control/forward', methods=['GET']) # def control_forward(): # try: # # 获取前端发送的数据 # data = request.args # # 调试输出接收到的数据 # print("接收到的数据:", data) # # 执行你的函数 # result = execute_forward_function() # 确保这个函数是可以被调用的,如果有必要打印它的返回值 # # 返回JSON响应, result 需要确保是可json化的对象 # return jsonify({ # "resultCode": 200, # "message": "请求成功", # "data": result # }) # except Exception as e: # print(f"发生错误: {e}") # 打印异常 # return jsonify({"resultCode": 500, "message": "内部服务器错误"}), 500 # 返回500错误 @app.route('/control/') def fun(info): if hasattr(car, info): getattr(car, info)() return 'Run: '+info+'\n' else: return 'Error: '+info+' not be defined\n' if __name__ == '__main__': app.run(host='0.0.0.0', port =80, debug=True, threaded=True) # t = threading.Thread(target=stream) # t.start() # server = pywsgi.WSGIServer(('0.0.0.0', 80), app) # server.serve_forever()