Python 通过 Flask 框架构建 REST API(三)——基于 Token 的身份认证

接上文Python 通过 Flask 框架构建 REST API(二)——优化项目架构
前面介绍了如何通过 Flaskmarshmallow 框架写一个完整的架构清晰的项目,作为 REST API 实现基本的增删改查功能。
本篇主要介绍在前文的基础上,借助 JWT(JSON Web Tokens)创建基于 Token 的身份认证机制。

一、安装依赖

在前文创建的 Python 虚拟环境中,额外安装如下两个 Python 库:
$ pip install passlib flask-jwt-extended

其中 passlib 用来提供对明文密码的哈希处理及验证,flask-jwt-extended 则引入了对 JWT 认证的支持。

users 数据库模型

编辑 src/api/models/users.py 文件,创建 User 数据库模型和 UserSchema 序列化对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# src/api/models/users.py
from api.utils.database import db
from passlib.hash import pbkdf2_sha256 as sha256
from marshmallow_sqlalchemy import ModelSchema
from marshmallow import fields


class User(db.Model):
__tablename__ = 'users'

id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(120), unique=True, nullable=False)
password = db.Column(db.String(120), nullable=False)

def create(self):
db.session.add(self)
db.session.commit()
return self

@classmethod
def find_by_username(cls, username):
return cls.query.filter_by(username=username).first()

@staticmethod
def generate_hash(password):
return sha256.hash(password)

@staticmethod
def verify_hash(password, hash):
return sha256.verify(password, hash)


class UserSchema(ModelSchema):
class Meta(ModelSchema.Meta):
model = User
sqla_session = db.session

id = fields.Number(dump_only=True)
username = fields.String(required=True)

三、路由和响应逻辑

编辑 src/api/routes/users.py 文件,加入 users 路由和对应的 POST 响应逻辑。
其中 create_user() 用于创建用户并将用户信息存入数据库表,authenticate_user() 用于完成用户认证并返回 Token 字符串作为之后的访问令牌。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from flask import Blueprint
from flask import request
from flask import url_for, render_template_string
from api.utils.responses import response_with
from api.utils import responses as resp
from api.models.users import User, UserSchema
from api.utils.database import db
from flask_jwt_extended import create_access_token
import datetime

user_routes = Blueprint("user_routes", __name__)


@user_routes.route('/', methods=['POST'])
def create_user():
try:
data = request.get_json()
data['password'] = User.generate_hash(data['password'])
user_schmea = UserSchema()
user = user_schmea.load(data)
result = user_schmea.dump(user.create())
return response_with(resp.SUCCESS_201)
except Exception as e:
print(e)
return response_with(resp.INVALID_INPUT_422)


@user_routes.route('/login', methods=['POST'])
def authenticate_user():
try:
data = request.get_json()
current_user = User.find_by_username(data['username'])
if not current_user:
return response_with(resp.SERVER_ERROR_404)
if User.verify_hash(data['password'], current_user.password):
access_token = create_access_token(identity=data['username'])
return response_with(resp.SUCCESS_201, value={'message': 'Logged in as {}'.format(current_user.username), "access_token": access_token})
else:
return response_with(resp.UNAUTHORIZED_401)
except Exception as e:
print(e)
return response_with(resp.INVALID_INPUT_422)

四、配置

编辑 src/main.py 文件,添加如下两行代码注册 users 路由:

1
2
from api.routes.users import user_routes
app.register_blueprint(user_routes, url_prefix='/api/users')

main.pydb.init_app(app) 代码前添加如下内容初始化 JWT 模块:

1
2
from flask_jwt_extended import JWTManager
jwt = JWTManager(app)

编辑 src/api/config/config.py 文件,在 ProductionConfigDevelopmentConfigTestingConfig 添加 JWT_SECRET_KEY 定义:

1
JWT_SECRET_KEY = 'SOME-RADOM-JWT-SECRET'

编辑 src/api/utils/responses.py 文件,添加“未认证”和“禁止访问”的标准响应格式:

1
2
3
4
5
6
7
8
9
10
11
FORBIDDEN_403 = {
"http_code": 403,
"code": "notAuthorized",
"message": "You are not authorised to execute this."
}

UNAUTHORIZED_401 = {
"http_code": 401,
"code": "notAuthorized",
"message": "Invalid authentication."
}

src/api/routes/authors.pysrc/api/routes/books.py 两个路由文件的响应逻辑中,对 POST、PATCH、PUT 和 DELETE 请求加入验证 Token 的访问条件,以限制匿名用户对后台数据的修改。
只需要在响应函数前加入 @jwt_required 装饰器即可,如:

1
2
3
4
5
6
7
8
9
from flask_jwt_extended import jwt_required

@author_routes.route('/<int:id>', methods=['DELETE'])
@jwt_required
def delete_author(id):
get_author = Author.query.get_or_404(id)
db.session.delete(get_author)
db.session.commit()
return response_with(resp.SUCCESS_204)

五、测试

运行 python run.py,使用 httpie 工具进行测试。

创建用户:

1
2
3
4
$ http POST 127.0.0.1:5000/api/users/ username=admin password=flask
{
"code": "success"
}

用户登录获取 Token:

1
2
3
4
5
6
$ http POST 127.0.0.1:5000/api/users/login username=admin password=flask
{
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1NzUwODkyMTQsIm5iZiI6MTU3NTA4OTIxNCwianRpIjoiMTdiZjViZTEtMmJjMS00OWY2LWI2ZDgtZDRmZGZkZGE2MDI1IiwiZXhwIjoxNTc1MDkwMTE0LCJpZGVudGl0eSI6ImFkbWluIiwiZnJlc2giOmZhbHNlLCJ0eXBlIjoiYWNjZXNzIn0.iipDvS-BFwOQ9IfybEZxn2adrBu7sC7MznDoXfnWEsI",
"code": "success",
"message": "Logged in as admin"
}

使用 POST 方法向 authors 数据表中插入作者信息:

1
2
3
4
5
6
7
8
9
10
$ http POST 127.0.0.1:5000/api/authors/ first_name=Jack last_name=Sparrow
HTTP/1.0 401 UNAUTHORIZED
Content-Length: 44
Content-Type: application/json
Date: Sat, 30 Nov 2019 04:53:15 GMT
Server: Werkzeug/0.16.0 Python/3.7.3

{
"msg": "Missing Authorization Header"
}

提示缺少 Authorization 请求头,即 POST 请求需要添加 Token 认证信息。

添加 Authorization 请求头(Token),重新发起 POST 请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
$ http POST 127.0.0.1:5000/api/authors/ Authorization:"Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE1NzUwODkyMTQsIm5iZiI6MTU3NTA4OTIxNCwianRpIjoiMTdiZjViZTEtMmJjMS00OWY2LWI2ZDgtZDRmZGZkZGE2MDI1IiwiZXhwIjoxNTc1MDkwMTE0LCJpZGVudGl0eSI6ImFkbWluIiwiZnJlc2giOmZhbHNlLCJ0eXBlIjoiYWNjZXNzIn0.iipDvS-BFwOQ9IfybEZxn2adrBu7sC7MznDoXfnWEsI" first_name=Jack last_name=Sparrow
HTTP/1.0 201 CREATED
Access-Control-Allow-Origin: *
Content-Length: 171
Content-Type: application/json
Date: Sat, 30 Nov 2019 04:57:05 GMT
server: Flask REST API

{
"author": {
"books": [],
"created": "2019-11-30 04:57:05",
"first_name": "Jack",
"id": 1.0,
"last_name": "Sparrow"
},
"code": "success"
}

成功创建数据。

注意 Authorization 请求头的格式为 Authorization:"Bearer <token>"

参考资料

Building REST APIs with Flask