from app.libs.redprint import Redprint
from app.libs.token_auth import auth
api = Redprint('user')
#@api.route('/get') URL中不应该包含动词
@api.route('',methods = ['GET'])
@auth.login_required
def get_user():
return 'get_user'
# 编写一个验证token的装饰器
from flask_httpauth import HTTPBasicAuth
from flask import current_app, g
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadSignature, \
SignatureExpired
from collections import namedtuple
from app.libs.erro_code import AuthFailed
auth = HTTPBasicAuth()
User = namedtuple('User', ['uid', 'ac_type', 'scope'])
# @auth.verify_password
# def verify_password(account, password):
# # 需要在HTTP请求的头部设置一个固定的键值对
# # key=Authorization,value=basic base64(account:psd)
# # imwl@live.com:12345678 编码后 aW13bEBsaXZlLmNvbToxMjM0NTY3OA==
# # key=Authorization,value=basic aW13bEBsaXZlLmNvbToxMjM0NTY3OA==
# return True
@auth.verify_password
def verify_password(token, password):
user_info = verify_auth_token(token) # token 赋值给 user_info
if not user_info:
return False
else:
g.user = user_info # g 变量 ,代理模式
return True
def verify_auth_token(token):
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(token) # 解密 token
# token不合法抛出的异常
except BadSignature:
raise AuthFailed(msg='token is valid', erro_code=1002)
# token过期抛出的异常
except SignatureExpired:
raise AuthFailed(msg='token is expired', erro_code=1003)
uid = data['uid']
ac_type = data['type'] # 生成令牌的时候写入了 uid ac_type
return User(uid, ac_type, '') # 定义对象式 接口返回回去 ,scope 先返回为空字符串
from app.libs.redprint import Redprint
from app.libs.token_auth import auth
from app.models.user import User
from app.models.base import db
from flask import jsonify
api = Redprint('user')
#@api.route('/get') URL中不应该包含动词
@api.route('/<int:uid>', methods = ['GET']) # 获取到用户的uid
@auth.login_required
def get_user(uid): # 接收 uid
user = User.query.get_or_404 (uid) # 获取到用户,用get_or_404简化判断用户是否存在
# 因为get_or_404 抛出的不是APIException,所以要重写
# query 属性下的方法
r = {
'nickname':user.nickname,
'email':user.email,
'password':user.password
} # 追求更好的写法
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy, BaseQuery
from sqlalchemy import Column, Integer, SmallInteger
from contextlib import contextmanager
class SQLAlchemy(_SQLAlchemy):
@contextmanager
def auto_commit(self):
try:
yield
self.session.commit()
except Exception as e:
db.session.rollback()
raise e
from app.libs.erro_code import NotFound
class Query(BaseQuery):
def filter_by(self, **kwargs):
if 'status' not in kwargs.keys():
kwargs['status'] = 1
return super(Query, self).filter_by(**kwargs)
# 仿照源码改写get_or_404,覆盖原来的 get_or_404]
def get_or_404(self, ident):
"""Like :meth:`get` but aborts with 404 if not found instead of returning ``None``."""
rv = self.get(ident)
if rv is None:
raise NotFound()
return rv
def first_or_404(self):
rv = self.first()
if rv is None:
raise NotFound()
return rv
db = SQLAlchemy(query_class=Query)
class Base(db.Model):
__abstract__ = True
create_time = Column(Integer)
status = Column(SmallInteger, default=1)
def __init__(self):
self.create_time = int(datetime.now().timestamp())
@property
def create_datetime(self):
if self.create_time:
return datetime.fromtimestamp(self.create_time)
else:
return None
def set_attrs(self, attrs_dict):
for key, value in attrs_dict.items():
if hasattr(self, key) and key != 'id':
setattr(self, key, value)
def delete(self):
self.status = 0
@api.route('/secret', methods=['POST'])
def get_token_info():
"""获取令牌信息"""
form = TokenForm().validate_for_api()
s = Serializer(current_app.config['SECRET_KEY'])
try:
data = s.loads(form.token.data, return_header=True)
except SignatureExpired:
raise AuthFailed(msg='token is expired', error_code=1003)
except BadSignature:
raise AuthFailed(msg='token is invalid', error_code=1002)
r = {
'scope': data[0]['scope'],
'create_at': data[1]['iat'],
'expire_in': data[1]['exp'],
'uid': data[0]['uid']
}
return jsonify(r)