from flask import Flask, Blueprint, render_template, redirect, url_for, request, flash from flask_sqlalchemy import SQLAlchemy from flask_login import UserMixin, LoginManager, login_user, login_required, logout_user, current_user import datetime import urllib.parse import save_azure db = SQLAlchemy() params = urllib.parse.quote_plus("Driver={ODBC Driver 18 for SQL Server};Server=tcp:theecho.database.windows.net,1433;Database=echo;Uid=echoDB;Pwd=Mariposa2502mARIPOSA2502$$2502$$;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;") app = Flask(__name__) app.config['SECRET_KEY'] = 'alpha-echo-testing' app.config['SQLALCHEMY_DATABASE_URI'] = "mssql+pyodbc:///?odbc_connect=%s" % params app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True # app.config['SECRET_KEY'] = 'echo-alpha-testing' # app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.sqlite' db.init_app(app) class User(UserMixin, db.Model): id = db.Column(db.Integer, primary_key=True) handle = db.Column(db.String(25)) email = db.Column(db.String(100), unique=True) password = db.Column(db.String(100)) name = db.Column(db.String(1000)) pp = db.Column(db.String(1000)) echos = db.relationship('Echo', backref='user', lazy=True) class Echo(db.Model): id = db.Column(db.Integer, primary_key=True) handle = db.Column(db.String(25)) name = db.Column(db.String(1000)) echo = db.Column(db.String(2000)) date = db.Column(db.DateTime, default="2023-05-23 00:39:37.030", nullable=False) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) replies = db.relationship('Reply', backref='echo', lazy=True) class Reply(db.Model): id = db.Column(db.Integer, primary_key=True) handle = db.Column(db.String(25)) name = db.Column(db.String(1000)) reply = db.Column(db.String(2000)) date = db.Column(db.DateTime, default=datetime.datetime.utcnow, nullable=False) echo_id = db.Column(db.Integer, db.ForeignKey('echo.id'), nullable=False) with app.app_context(): db.create_all() login_manager = LoginManager() login_manager.login_view = 'login' login_manager.init_app(app) @login_manager.user_loader def load_user(user_id): # since the user_id is just the primary key of our user table, use it in the query for the user return User.query.get(int(user_id)) @app.route("/") def index(): return render_template('index.html', echos=Echo.query.all(), replies=Reply.query.all()) @app.route('/profile') @login_required def profile(): return render_template('profile.html', name=current_user.name, echos=current_user.echos) @app.route('/echo', methods=['POST']) @login_required def echo(): echo = request.form.get('echo') db.session.add(Echo(echo=echo, user_id=current_user.id, handle=current_user.handle, name=current_user.name)) db.session.commit() return redirect(url_for('profile')) @app.route('/reply', methods=['POST']) @login_required def reply(): reply = request.form.get('reply') echo_id = request.form.get('echo_id') db.session.add(Reply(reply=reply, echo_id=echo_id, handle=current_user.handle, name=current_user.name)) db.session.commit() return redirect(url_for('index')) @app.route('/login') def login(): return render_template('login.html') @app.route('/login', methods=['POST']) def login_post(): # login code goes here email = request.form.get('email') password = request.form.get('password') remember = True if request.form.get('remember') else False user = User.query.filter_by(email=email).first() # check if the user actually exists # take the user-supplied password, hash it, and compare it to the hashed password in the database if not user or not (user.password, password): flash('Please check your login details and try again.') return redirect(url_for('login')) # if the user doesn't exist or password is wrong, reload the page login_user(user, remember=remember) return redirect(url_for('profile')) @app.route('/signup') def signup(): return render_template('signup.html') @app.route('/signup', methods=['POST']) def signup_post(): # code to validate and add user to database goes here handle = request.form.get('handle') email = request.form.get('email') name = request.form.get('name') password = request.form.get('password') # email = User.query.filter_by(email=email).first() # if this returns a user, then the email already exists in database # handle = User.query.filter_by(handle=handle).first() # if this returns a user, then the handle already exists in database if User.query.filter_by(email=email).first() or User.query.filter_by(handle=handle).first(): # if a user is found, we want to redirect back to signup page so user can try again flash('Email address already exists') return redirect(url_for('signup')) # create a new user with the form data. Hash the password so the plaintext version isn't saved. new_user = User(handle=handle, email=email, name=name, password=password) # add the new user to the database db.session.add(new_user) db.session.commit() return redirect(url_for('login')) @app.route('/') @login_required def user(handle): return render_template('user.html', handle=handle , echos=Echo.query.filter_by(handle=handle).all()) # upload a profile picture @app.route('/pp', methods=['POST']) @login_required def pp(): pp = request.files['file'] dir = 'static/pp/' if pp.filename.endswith('.jpg'): pp.save(dir + current_user.handle + '.jpg') filename = current_user.handle + '.jpg' elif pp.filename.endswith('.png'): pp.save(dir + current_user.handle + '.png') filename = current_user.handle + '.png' path = dir + filename print(path) db.session.add(User(pp=save_azure.save(path, filename), id=current_user.id)) return redirect(url_for('profile')) # @app.route('//') # @login_required # def echo_page(handle, echo_id): # return render_template('echo.html', handle=handle, echo=Echo.query.get(echo_id), replies=Reply.query.filter_by(echo_id=echo_id).all()) @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('index')) # if __name__ == "__main__": # app.run(debug=True) # # app.run(host='0.0.0.0', port=443, ssl_context=('cert.pem', 'privkey.pem'))