vault backup: 2026-04-15 21:05:29

This commit is contained in:
2026-04-15 21:05:29 +08:00
parent 18fc94823e
commit 38e0dba063

View File

@@ -221,6 +221,225 @@
- Server后端相关文档。
# 需求
1. 在保证相关接口不变的情况下,重构./backend 中的服务端内容。
2. 根据需求迭代./frontend
1. 简单登录预留OIDC登录。
2.
3. 简单登录预留OIDC登录。
4.
## OIDC 参考代码
```python
#coding:UTF-8
"""
Requirements:
1. Flask >= 0.10.1
2. requests
3. jwkest >= 1.1.7
Usage:
python oidc_code_demo.py -H {listen_address} -p {listen_port}
Help:
python oidc_code_demo.py -h
"""
import os
import uuid
from hashlib import md5
import datetime
from urllib import urlencode
import json
import requests
from flask import Flask, request, jsonify, session, redirect
from jwkest.jwk import SYMKey
from jwkest.jws import JWS
from jwkest.jwk import load_jwks_from_url
from jwkest.jws import NoSuitableSigningKeys
__revision__ = "0.01"
__author__ = "chenxs@corp.netease.com"
OIDC_CLIENT_ID = ""
OIDC_CLIENT_SECRET = ""
OIDC_PROVIDER = "https://login.netease.com/connect"
OIDC_AUTHORIZATION_SERVER = "https://login.netease.com/connect/authorize"
OIDC_TOKEN_ENDPOINT = "https://login.netease.com/connect/token"
OIDC_USERINFO_ENDPOINT = "https://login.netease.com/connect/userinfo"
OIDC_SCOPE = "openid nickname email fullname dep title empno"
OIDC_REDIRECT_URI = "https://127.0.0.1:5000/finish"
OIDC_JWKS_URI = "https://login.netease.com/connect/jwks"
OIDC_ALG = "HS256"
PYTHON_OIDC_DEMO = Flask(__name__)
@PYTHON_OIDC_DEMO.route("/", methods=['GET'])
def index():
"""index"""
if 'username' in session:
body = (
u"</br><h2>OpenID Connect 鐧诲綍鎴愬姛銆<E5A79B></h2></br>"
u"鎮ㄧ殑鐢ㄦ埛鍚嶆槸锛<EFBFBD>%s</br>"
u"鎮ㄧ殑鍏ㄥ悕鏄細%s</br>"
u"鎮ㄧ殑閭鏄細%s</br>"
u"鎮ㄧ殑鑱屼綅鏄細%s</br>"
u"鎮ㄧ殑閮ㄩ棬鏄細%s</br>"
u"鎮ㄧ殑宸ュ彿鏄細%s</br>") % (
session['username'], session.get('fullname', ''),
session.get('email', ''), session.get('title', ''),
session.get('dep', ''), session.get('empno', ''))
body += u"<a href='/login'>鎴虫垜閲嶆柊鐧诲綍</a>"
return body
else:
return u"<a href='/login'>鎴虫垜鐧诲綍</a>"
@PYTHON_OIDC_DEMO.route("/login", methods=['GET'])
def login():
"""AuthN Request"""
session.clear()
now = datetime.datetime.now().strftime("%s")
session['uid'] = uuid.uuid4().hex
session['state'] = session['uid']
session['nonce'] = md5(session['uid'] + now).hexdigest()
authn_request_params = {
'response_type': 'code',
'client_id': OIDC_CLIENT_ID,
'state': session['state'],
'nonce': session['nonce'],
'scope': OIDC_SCOPE,
'redirect_uri': OIDC_REDIRECT_URI,
#'prompt': 'login',
'display': 'touch',
}
redirect_url = "?".join([
OIDC_AUTHORIZATION_SERVER, urlencode(authn_request_params)])
return redirect(redirect_url)
def token_request(code):
"""2. Token Request"""
params = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': OIDC_REDIRECT_URI,
'client_id': OIDC_CLIENT_ID,
'client_secret': OIDC_CLIENT_SECRET,
}
_resp = requests.post(OIDC_TOKEN_ENDPOINT, data=params)
return json.loads(_resp.text)
def id_token_verify(id_token, nonce=None):
"""3. id token verify"""
now = int(datetime.datetime.now().strftime("%s"))
if OIDC_ALG == "HS256":
signed_keys = [SYMKey(key=OIDC_CLIENT_SECRET)]
else:
signed_keys = load_jwks_from_url(OIDC_JWKS_URI)
try:
plain_id_token = JWS().verify_compact(id_token, signed_keys)
except NoSuitableSigningKeys:
# logger the id_token please
return {'error': 'can not verify the id token'}
print "idtoken: %s" % plain_id_token
if nonce:
if (not plain_id_token.has_key('nonce')) or (
plain_id_token['nonce'] != nonce):
return {'error': 'id token nonce not correct'}
if plain_id_token['iss'] != OIDC_PROVIDER:
return {'error': 'id token iss not correct'}
if plain_id_token['aud'] != OIDC_CLIENT_ID:
return {'error': 'id token aud not correct'}
if now >= int(plain_id_token['exp']):
return {'error': 'id token expired'}
return {'id_token': plain_id_token}
@PYTHON_OIDC_DEMO.route("/finish", methods=['GET'])
def finish():
"""
1. AuthN Response
2. Token Request
3. id token verify
4. userinfo request
5. login user
"""
# 1. AuthN Response
try:
code = request.args.get('code')
if session['state']:
state = request.args.get('state')
if state != session['state']:
return u"闈炴硶璇锋眰"
except ValueError:
return u"闈炴硶璇锋眰"
# 2. Token Request
token = token_request(code)
print "token: %s" % token
if token.has_key('error'):
return u"鍑洪敊浜嗭細%s" % str(token)
# 3. id token verify
id_token_verified = id_token_verify(token['id_token'])
if id_token_verified.has_key('error'):
return id_token_verified['error']
else:
id_token = id_token_verified['id_token']
print "id_token: %s" % id_token
# 4. userinfo request
_req_session = requests.Session()
_req_session.headers.update({
"Authorization": "Bearer %s" % token['access_token']})
userinfo_req = _req_session.get(OIDC_USERINFO_ENDPOINT)
userinfo = json.loads(userinfo_req.text)
# login the user
session['username'] = userinfo['nickname']
session['email'] = userinfo['email']
session['title'] = userinfo.get('title', '')
session['empno'] = userinfo.get('empno', '')
session['dep'] = userinfo.get('dep', '')
session['fullname'] = userinfo.get('fullname', '')
return redirect("/")
if __name__ == "__main__":
import sys
import argparse
parser = argparse.ArgumentParser(
usage='%(prog)s [options]', version='%(prog)s ' + str(__revision__))
parser.add_argument(
'-H', '--host', dest='host', type=str,
help="Specify listening adress, default is 127.0.0.1")
parser.add_argument(
'-p', '--port', dest='port', type=int,
help="Specify listening port, default is 5000")
parser.add_argument(
'-c', '--client_id', dest='client_id', type=str,
help="oidc client_id is required.")
parser.add_argument(
'-s', '--client_secret', dest='client_secret', type=str,
help="oidc client_secret is required.")
args = parser.parse_args()
host = args.host or '127.0.0.1'
port = args.port or 5000
OIDC_REDIRECT_URI = "http://%s:%s/finish" % (host, port)
OIDC_CLIENT_ID = args.client_id
OIDC_CLIENT_SECRET = args.client_secret
if not OIDC_CLIENT_ID or not OIDC_CLIENT_SECRET:
parser.print_help()
parser.exit()
PYTHON_OIDC_DEMO.secret_key = "this is a random secret"
PYTHON_OIDC_DEMO.debug = True
PYTHON_OIDC_DEMO.run(host=host, port=port)
```