move to abl

This commit is contained in:
grimhilt 2023-08-05 16:05:58 +02:00
parent 92e2b6c92a
commit f6cc938fb7
12 changed files with 236 additions and 64 deletions

View File

@ -1,6 +1,7 @@
from flask import Flask from flask import Flask
from flask_sqlalchemy import SQLAlchemy from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS from flask_cors import CORS
from flask_login import LoginManager
from os import path from os import path
import logging import logging
@ -13,20 +14,33 @@ def create_api():
app = Flask(__name__) app = Flask(__name__)
CORS(app) CORS(app)
logging.getLogger('flask_cors').level = logging.DEBUG logging.getLogger('flask_cors').level = logging.DEBUG
#CORS(app, resources={r"/*": {"origin": ["http://localhost:3008"]}})
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_NAME}' app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{DB_NAME}'
app.secret_key = b'_5#y2L"F4Qfj8zxec]'
login_manager = LoginManager()
login_manager.login_view = 'auth.login'
login_manager.init_app(app)
db.init_app(app) db.init_app(app)
from .abl.user import user from .controllers.user import user
from .abl.playlist import playlist from .controllers.playlist import playlist
from .abl.file import file from .controllers.file import file
from .controllers.auth import auth
from .controllers.roles import roles
app.register_blueprint(user, url_prefix='/api/user') app.register_blueprint(user, url_prefix='/api/user')
app.register_blueprint(playlist, url_prefix='/api/playlist') app.register_blueprint(playlist, url_prefix='/api/playlist')
app.register_blueprint(file, url_prefix='/api/file') app.register_blueprint(file, url_prefix='/api/file')
app.register_blueprint(auth, url_prefix='/auth')
app.register_blueprint(roles, url_prefix='/api/roles')
from .models import User, Playlist, PlaylistFile, File from .models import User, Playlist, PlaylistFile, File
@login_manager.user_loader
def load_user(user_id):
return db.session.query(User).get(int(user_id))
with app.app_context(): with app.app_context():
db.create_all() db.create_all()

52
src/api/abl/AuthAbl.py Normal file
View File

@ -0,0 +1,52 @@
from flask import Blueprint, request, jsonify, make_response
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import login_user, login_required, current_user, logout_user
from ..models import User, Role, UserRole
from .. import db
class AuthAbl:
@staticmethod
def signup(data):
login = data['login']
password = data['password']
is_first_user = db.session.query(User).count() == 0
if not is_first_user and current_user is None:
return jsonify(message="You cannot create an account without being authenticated"), 401
user = db.session.query(User).filter_by(login=login).first()
if user:
return jsonify(user.as_dict()), 302
new_user = User(login=login, password=generate_password_hash(password, method='sha256'))
if is_first_user:
new_role = Role(name="admin", can_create_role=True, can_create_playlist=True)
db.session.add(new_role)
new_user.roles.append(new_role)
db.session.add(new_user)
db.session.flush()
db.session.commit()
return jsonify(new_user.as_dict())
@staticmethod
def login(data):
login = data['login']
password = data['password']
user = db.session.query(User).filter_by(login=login).first()
if not user or not check_password_hash(user.password, password):
return jsonify(message="Incorrect credentials"), 401
login_user(user)
return jsonify(success=True)
@staticmethod
def profile():
pr = current_user.as_dict()
return jsonify(pr)

View File

@ -0,0 +1,15 @@
from flask import jsonify
class PlaylistAbl:
@staticmethod
def create(data):
print("create in")
return jsonify(), 200
#new_playlist = Playlist(name=data['name'])
#db.session.add(new_playlist)
#db.session.flush()
#db.session.commit()
#res = new_playlist.as_dict()
#res['last_modified'] = res['last_modified'].isoformat()
#return jsonify(res)

View File

@ -1,47 +0,0 @@
from flask import Blueprint, request, jsonify, make_response
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import login_user, login_required, current_user, logout_user
from ..models import User
from .. import db
auth = Blueprint('auth', __name__)
@auth.route('/login', methods=['POST'])
def login():
data = request.get_json()
login = data['login']
password = data['password']
user = db.session.query(User).filter_by(login=login).first()
if not user or not check_password_hash(user.password, password):
return jsonify(message="Incorrect credentials"), 401
login_user(user)
return jsonify(success=True)
@auth.route('/signup', methods=['POST'])
def signup():
data = request.get_json()
login = data['login']
password = data['password']
user = db.session.query(User).filter_by(login=login).first()
if user:
return jsonify(user.as_dict_safe()), 302
new_user = User(login=login, password=generate_password_hash(password, method='sha256'))
db.session.add(new_user)
db.session.flush()
db.session.commit()
return jsonify(new_user.as_dict_safe())
@auth.route('/logout', methods=['POST'])
@login_required
def logout():
logout_user()
return jsonify(success=True)
@auth.route('/profile')
@login_required
def profile():
return jsonify(current_user.as_dict_safe())

View File

@ -0,0 +1,25 @@
from flask import Blueprint, request, jsonify
from flask_login import login_required, logout_user
from ..abl.AuthAbl import AuthAbl
auth = Blueprint('auth', __name__)
@auth.route('/login', methods=['POST'])
def login():
return AuthAbl.login(request.get_json())
@auth.route('/signup', methods=['POST'])
def signup():
return AuthAbl.signup(request.get_json())
@auth.route('/logout', methods=['POST'])
@login_required
def logout():
logout_user()
return jsonify(success=True)
@auth.route('/profile')
@login_required
def profile():
return AuthAbl.profile()

View File

@ -8,6 +8,7 @@ FILE_DIR = './data/'
@file.route('/', methods=['POST']) @file.route('/', methods=['POST'])
def upload(): def upload():
files = request.files.getlist('file') files = request.files.getlist('file')
print(files)
res = [] res = []
for file in files: for file in files:
exists = db.session.query(File).filter(File.name == file.filename).first() exists = db.session.query(File).filter(File.name == file.filename).first()

View File

@ -5,21 +5,18 @@ from .. import db
from datetime import datetime from datetime import datetime
from sqlalchemy.sql import func from sqlalchemy.sql import func
from ..dao.Playlist import PlaylistDao from ..dao.Playlist import PlaylistDao
from flask_login import login_required, current_user
from ..abl.PlaylistAbl import PlaylistAbl
from screen.ScreenManager import ScreenManager from screen.ScreenManager import ScreenManager
from ..permissions import Perm, permissions
playlist = Blueprint('playlist', __name__) playlist = Blueprint('playlist', __name__)
@playlist.route('/', methods=['PUT']) @playlist.route('/', methods=['POST'])
@login_required
@permissions.require([Perm.CREATE_PLAYLIST])
def create(): def create():
data = request.get_json() return PlaylistAbl.create(request.get_json())
new_playlist = Playlist(name=data['name'])
db.session.add(new_playlist)
db.session.flush()
db.session.commit()
res = new_playlist.as_dict()
res['last_modified'] = res['last_modified'].isoformat()
return jsonify(res)
@playlist.route('/', methods=["GET"]) @playlist.route('/', methods=["GET"])
def list(): def list():
@ -87,7 +84,7 @@ def remove_file(playlist_id):
db.session.commit() db.session.commit()
return jsonify(success=True) return jsonify(success=True)
@playlist.route('/<int:playlist_id>/update', methods=["POST"]) @playlist.route('/<int:playlist_id>/update', methods=["PUT"])
def update(playlist_id): def update(playlist_id):
data = request.get_json() data = request.get_json()
db.session.query(Playlist) \ db.session.query(Playlist) \

View File

@ -0,0 +1,35 @@
from flask import Blueprint, request, jsonify, make_response
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import login_user, login_required, current_user, logout_user
from ..models import Role
from .. import db
roles = Blueprint('roles', __name__)
@roles.route('/', methods=['POST'])
@login_required
def create():
data = request.get_json()
parent_id = data['parent_id'] if "parent_id" in data else None
can_create_role = data['can_create_role'] if data['can_create_role'] else False
can_create_playlist = data['can_create_playlist'] if data['can_create_playlist'] else False
name = data['name']
role = db.session.query(Role).filter_by(name=name).first()
if role:
return jsonify(message="A role with this name already exists"), 400
new_role = Role(name=name, parent_id=parent_id, can_create_role=can_create_role, can_create_playlist=can_create_playlist)
db.session.add(new_role)
db.session.flush()
db.session.commit()
return jsonify(new_role.as_dict())
@roles.route('/<int:role_id>', methods=['GET'])
@login_required
def get(role_id):
role = db.session.query(Role).filter_by(id=role_id).first()
if role:
return jsonify(role.as_dict())
return jsonify(), 404

View File

@ -33,7 +33,34 @@ class Playlist(db.Model):
def as_dict(self): def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns} return {c.name: getattr(self, c.name) for c in self.__table__.columns}
class UserRole(db.Model):
__tablename__ = 'UserRole'
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), primary_key=True)
role_id = db.Column(db.Integer, db.ForeignKey('role.id'), primary_key=True)
class Role(db.Model):
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
name = db.Column(db.String)
parent_id = db.Column(db.Integer, db.ForeignKey('role.id'))
can_create_role = db.Column(db.Boolean, default=False)
can_create_playlist = db.Column(db.Boolean, default=False)
users = db.relationship('User', secondary='UserRole', back_populates='roles')
def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
class User(db.Model, UserMixin): class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key = True, autoincrement=True) id = db.Column(db.Integer, primary_key = True, autoincrement=True)
login = db.Column(db.String(150)) login = db.Column(db.String(150))
password = db.Column(db.String(150)) password = db.Column(db.String(150))
roles = db.relationship('Role', secondary='UserRole', back_populates='users')
def as_dict(self):
res = self.as_dict_unsafe()
res['roles'] = [role.as_dict() for role in self.roles]
del res['password']
return res
def as_dict_unsafe(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}

53
src/api/permissions.py Normal file
View File

@ -0,0 +1,53 @@
from enum import Enum
import functools
from flask import request, jsonify
from flask_login import current_user
from . import db
from .models import Playlist, PlaylistFile, User, Role, UserRole
Perm = Enum('Perm', ['CREATE_ROLE', 'CREATE_PLAYLIST'])
class permissions:
@staticmethod
def require(permissions):
def decorator_require_permissions(func):
@functools.wraps(func)
def wrapper_require_permissions(*args, **kwargs):
for perm in permissions:
check_perm = CheckPermissionFactory(perm)
if not check_perm.is_valid():
return jsonify( \
message=check_perm.message), \
check_perm.status_code
return func(*args, **kwargs)
return wrapper_require_permissions
return decorator_require_permissions
def CheckPermissionFactory(perm):
print(perm)
match perm:
case Perm.CREATE_ROLE:
return CheckCreateRole()
case Perm.CREATE_PLAYLIST:
print("creat plays")
return CheckCreatePlaylist()
case _:
return CheckNone()
class CheckNone:
def is_valid(self):
return True
class CheckCreatePlaylist:
def is_valid(self):
q = db.session.query(User) \
.filter_by(id=current_user.as_dict()['id']) \
.first()
print(q.as_dict())

View File

@ -5,7 +5,7 @@ api = create_api()
screen_manager = ScreenManager().getInstance() screen_manager = ScreenManager().getInstance()
if __name__ == '__main__': if __name__ == '__main__':
#api.run(host="0.0.0.0", port=5500, debug=True) api.run(host="0.0.0.0", port=5500, debug=True)
api.run(host="0.0.0.0", port=5500) #api.run(host="0.0.0.0", port=5500)