首页 >> 大全

JWT-RESTful进行身份认证

2023-11-14 大全 25 作者:考证青年

视频链接

文章目录 JWT的实现 Go语言实现 JWT 更新状态问题(白名单) 总结

服务器存储与客户端存储 基于服务器的身份认证方式存在一些问题: 客户端存储

JWT与的差异 相同点是,它们都是存储用户信息;然而,是在服务器端的,而JWT是在客户端的。

方式存储用户信息的最大问题在于要占用大量服务器内存,增加服务器的开销。

而JWT方式将用户状态分散到了客户端中,可以明显减轻服务端的内存压力。

的状态是存储在服务器端,客户端只有 id;而Token的状态是存储在客户端。

JWT的实现

语言实现

# -*- coding: utf8 -*-from typing import Optionalimport datetimeimport jwt
from werkzeug.local import LocalProxy
from flask import current_app, request, has_app_context, _app_ctx_stackfrom . import exceptions
import osregistered_claims = {'iss', 'sub', 'aud', 'exp', 'nbf', 'iat', 'jti'}class _AuthObject:def __init__(self, **kwargs):for key, val in kwargs.items():setattr(self, key, val)def __getattr__(self, item):return Nonedef __repr__(self):return str(self.__dict__)def encode_token(iss: Optional[str] = None,expire: Optional[datetime.timedelta] = None,**kwargs):"""encode jwt token:param iss::param expire: timedelta object, set the expire time of jwt:param kwargs::return:"""try:header = {'algorithm': 'HS256', 'type': 'JWT'}payload = {'iat': datetime.datetime.utcnow(),'iss': iss or 'website.com',}# update public claim namespayload.update(**kwargs)# if set jwt expire time, update exp claimif expire:payload['exp'] = payload['iat'] + expire# gen jwt tokentoken = jwt.encode(payload, os.getenv('SECRET_KEY'), headers=header)return tokenexcept Exception:raise exceptions.Internal(message='无效token')def decode_token(token, verify_exp: bool = False):"""decode jwt token:param token::param verify_exp::return:"""try:tmp = {}payload = jwt.decode(token, os.getenv('SECRET_KEY'), options={'verify_exp': verify_exp})# get the public claim namesfor field in payload.keys():if field in registered_claims:continuetmp[field] = payload[field]return tmpexcept jwt.ExpiredSignatureError:raise exceptions.Unauthenticated(message='token已过期')except jwt.InvalidTokenError:raise exceptions.Unauthenticated(message='无效token')def _get_auth():if not has_app_context():raise RuntimeError('No application found. Either work inside a view function or push'' an application context.')if not hasattr(_app_ctx_stack.top, 'auth'):# get and validate auth filed in request headerauth_header = request.headers.get('Authorization')if not auth_header:raise exceptions.InvalidArgument(message="请求头错误")auth_attr = auth_header.split(' ')if not auth_attr or auth_attr[0] != 'JWT' or len(auth_attr) != 2:raise exceptions.InvalidArgument(message="token格式错误")payload = decode_token(auth_attr[1])# set auth data to app context_app_ctx_stack.top.auth = _AuthObject(**payload)return getattr(_app_ctx_stack.top, 'auth')# global variable
current_auth = LocalProxy(lambda: _get_auth())

Flask 与

flask 一般在请求钩子中设置 或者 在调用时加入

jwt 使用装饰器

init.py

# -*- coding: utf8 -*-"""
权限管理,JWT实现
"""from .permission import auth_control, current_auth__all__ = ['auth_control', 'current_auth']

# -*- coding: utf8 -*-from typing import Optionalimport datetimeimport jwt
from werkzeug.local import LocalProxy
from flask import current_app, request, has_app_context, _app_ctx_stackfrom . import exceptionsregistered_claims = {'iss', 'sub', 'aud', 'exp', 'nbf', 'iat', 'jti'}class _AuthObject:def __init__(self, **kwargs):for key, val in kwargs.items():setattr(self, key, val)def __getattr__(self, item):return Nonedef __repr__(self):return str(self.__dict__)def encode_token(iss: Optional[str] = None,expire: Optional[datetime.timedelta] = None,**kwargs):"""encode jwt token:param iss::param expire: timedelta object, set the expire time of jwt:param kwargs::return:"""try:header = {'algorithm': 'HS256', 'type': 'JWT'}payload = {'iat': datetime.datetime.utcnow(),'iss': iss or 'startask',}# update public claim namespayload.update(**kwargs)# if set jwt expire time, update exp claimif expire:payload['exp'] = payload['iat'] + expire# gen jwt tokentoken = jwt.encode(payload, current_app.config['SECRET_KEY'], headers=header)return tokenexcept Exception:raise exceptions.Internal(message='无效token')def decode_token(token, verify_exp: bool = False):"""decode jwt token:param token::param verify_exp::return:"""try:tmp = {}payload = jwt.decode(token, current_app.config['SECRET_KEY'], options={'verify_exp': verify_exp})# get the public claim namesfor field in payload.keys():if field in registered_claims:continuetmp[field] = payload[field]return tmpexcept jwt.ExpiredSignatureError:raise exceptions.Unauthenticated(message='token已过期')except jwt.InvalidTokenError:raise exceptions.Unauthenticated(message='无效token')def _get_auth():if not has_app_context():raise RuntimeError('No application found. Either work inside a view function or push'' an application context.')if not hasattr(_app_ctx_stack.top, 'auth'):# get and validate auth filed in request headerauth_header = request.headers.get('Authorization')if not auth_header:raise exceptions.InvalidArgument(message="请求头错误")auth_attr = auth_header.split(' ')if not auth_attr or auth_attr[0] != 'JWT' or len(auth_attr) != 2:raise exceptions.InvalidArgument(message="token格式错误")payload = decode_token(auth_attr[1])# set auth data to app context_app_ctx_stack.top.auth = _AuthObject(**payload)return getattr(_app_ctx_stack.top, 'auth')# global variable
current_auth = LocalProxy(lambda: _get_auth())

直接调用 该方法

权限控制装饰器

# -*- coding: utf8 -*-from functools import wraps
from typing import Optional, List, Tuplefrom flask import current_appfrom titan.user.enum import RoleEnum
from titan.factory import redis
from titan.cache import SignOutUserListKey
from stardust.exceptions import PermissionDenied, Unauthenticated
from stardust.auth import current_authdef _access_control(auth_roles: Optional[List[RoleEnum]] = None) -> Tuple[str, RoleEnum]:id_ = current_auth.iduser_role = RoleEnum(current_auth.role)env = current_auth.envif redis.sismember(SignOutUserListKey, id_):raise PermissionDenied(message='请重新登录')if not auth_roles:auth_roles = [RoleEnum.DEFAULT]if user_role != RoleEnum.ADMIN and user_role not in auth_roles:raise Unauthenticated(message="无权访问")if current_app.config['ENV'] != env:raise Unauthenticated(message="环境有误,请重新登录")return id_, user_roledef auth_control(roles: Optional[List[RoleEnum]] = None):def wrapper(func):@wraps(func)def wrap_func(*args, **kwargs):id_, user_role = _access_control(roles)return func(id_, user_role, *args, **kwargs)return wrap_funcreturn wrapper

@blueprint.route('/summary', methods=['GET'])
@auth_control()
def task_issue_summary(*args, **kwargs):# 获取请求数据,数据校检data_pb = api_request(issue_pb2.AnnotationIssueSummeryRequestProto)required = {'task_id', 'container_id'}variable_check(data_pb, required)frame_indexes = controller.task_issue_summary(current_auth.id, data_pb.task_id, data_pb.container_id,data_pb.sampling_id)resp_dict = dict(frame_indexes=frame_indexes)return api_response(issue_pb2.AnnotationIssueSummaryResponseProto, resp_dict)

Go语言实现

package mainimport ("fmt""github.com/dgrijalva/jwt-go""github.com/gin-gonic/gin""net/http""time"
)//自定义一个字符串
var jwtkey = []byte("www.topgoer.com")
var str stringtype Claims struct {UserId uintjwt.StandardClaims
}func main() {r := gin.Default()r.GET("/set", setting)r.GET("/get", getting)//监听端口默认为8080r.Run(":8080")
}//颁发token
func setting(ctx *gin.Context) {expireTime := time.Now().Add(7 * 24 * time.Hour)claims := &Claims{UserId: 2,StandardClaims: jwt.StandardClaims{ExpiresAt: expireTime.Unix(), //过期时间IssuedAt:  time.Now().Unix(),Issuer:    "127.0.0.1",  // 签名颁发者Subject:   "user token", //签名主题},}fmt.Println(claims)token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)fmt.Println(token)tokenString, err := token.SignedString(jwtkey)fmt.Println(tokenString)if err != nil {fmt.Println(err)}str = tokenStringctx.JSON(200, gin.H{"token": tokenString})
}//解析token
func getting(ctx *gin.Context) {tokenString := ctx.GetHeader("Authorization")//vcalidate token formateif tokenString == "" {ctx.JSON(http.StatusUnauthorized, gin.H{"code": 401, "msg": "权限不足"})ctx.Abort()return}token, claims, err := ParseToken(tokenString)if err != nil || !token.Valid {ctx.JSON(http.StatusUnauthorized, gin.H{"code": 401, "msg": "权限不足"})ctx.Abort()return}fmt.Println(111)fmt.Println(claims.UserId)
}
func ParseToken(tokenString string) (*jwt.Token, *Claims, error) {Claims := &Claims{}token, err := jwt.ParseWithClaims(tokenString, Claims, func(token *jwt.Token) (i interface{}, err error) {return jwtkey, nil})fmt.Println(token, Claims)return token, Claims, err
}

获取token

JWT 更新状态问题(白名单)

如何让用户无感知获取最新token

参考

总结 优点

因为json的通用性,所以JWT是可以进行跨语言支持的,像JAVA,,,PHP等很多语言都可以使用。

因为有了部分,所以JWT可以在自身存储一些其他业务逻辑所必要的非敏感信息。

便于传输,jwt的构成非常简单,字节占用很小,所以它是非常便于传输的。

它不需要在服务端保存会话信息, 所以它易于应用的扩展

安全相关

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了