提交 95ed0a58 authored 作者: 李学军's avatar 李学军

Initial commit

上级
import datetime
import json
from flask import Flask, request, jsonify, make_response
from flask_cors import CORS
import os
import uuid
import redis
from flask_socketio import SocketIO, emit
import couchdb
# 步骤 1:创建 Server 对象
server = couchdb.Server('http://admin:M36j44l*.*@117.72.90.129:5984/') # 替换为实际 IP/域名‌
# 步骤 2:添加用户名密码验证
# server.resource.http.add_credentials('admin', 'M36j44l*.*')
# 步骤 3:连接指定数据库
try:
db = server['jianli'] # 若数据库不存在会抛出 couchdb.ResourceNotFound 异常}
print("连接成功!")
except couchdb.Unauthorized:
print("认证失败,请检查用户名/密码")
except couchdb.ResourceNotFound:
print("数据库不存在")
# 基本连接配置
client = redis.StrictRedis(
host='117.72.90.129',
port=6379,
db=0,
password='m36j44l*.*', # 可选(若启用密码)
socket_timeout=5 # 设置超时时间(秒)
)
# 测试连接
try:
if client.ping():
print("Redis 连接成功")
except redis.ConnectionError:
print("连接失败")
app = Flask(__name__)
CORS(app) # 解决跨域问题
socketio = SocketIO(app, cors_allowed_origins="*") # 允许跨域
# 初始化(例如加载模型)
# model = load_model() # 这里可以加载你的 PyTorch 模型
@app.route('/')
def index():
return "Welcome to Flask API"
@app.errorhandler(404)
def not_found(error):
return make_response(jsonify({'error': 'Not found'}), 404)
@app.errorhandler(413)
def request_entity_too_large(error):
return make_response(jsonify({'error': 'File too large'}), 413)
@app.route('/getInfo', methods=['GET'])
def getInfo():
matched_keys = []
cursor = 0
while True:
cursor, keys = client.scan(cursor, match="Device:*")
for key in keys:
matched_keys.append(key.decode('utf-8'))
if cursor == 0:
break
print(matched_keys)
values =[]
for k in matched_keys:
value = client.get(k)
if value:
values.append(value.decode('utf-8'))
print(values)
return jsonify(values)
@app.route('/getcgq', methods=['GET'])
def getcgq():
values = client.get("CGQ")
decoded_value = values.decode('utf-8')
return jsonify(decoded_value)
@socketio.on('message') # 监听客户端发送的 'message' 事件
def handle_message(data):
print('Received:', data)
emit('response', {'data': 'Server reply'}) # 向客户端发送响应
if __name__ == '__main__':
# 启动服务(生产环境建议使用 Gunicorn 等服务器)
app.run(host='0.0.0.0', port=5000, debug=True)
\ No newline at end of file
import cv2
import time
def read_http_stream(url):
cap = cv2.VideoCapture(url)
if not cap.isOpened():
print("can't open capture")
return None
try:
while True:
ret, frame = cap.read()
if not ret:
print("帧丢失,尝试恢复...")
cap.release()
cap = cv2.VideoCapture(url)
time.sleep(2)
continue
# 自定义处理逻辑(如保存帧)
cv2.imshow('Stream', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
except Exception as e:
print(f"异常: {e}")
finally:
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
stream_url = "http://192.168.1.141:8899/video_feed"
read_http_stream(stream_url)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论