Coverage for yaptide/admin/db_manage.py: 28%
228 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-04 00:31 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-04 00:31 +0000
1#! /usr/bin/env python
2"""
3This script is used to manage the database via a command line application.
4It is supposed to be used outside of the docker container, therefore by purpose it has
5no dependency on the yaptide package. All the tables used by ORM are defined here again.
6We do not use ORM model classes here, because we do not want to import the yaptide package.
7"""
8from enum import Enum, auto
9import os
11import click
12import sqlalchemy as db
13from werkzeug.security import generate_password_hash
16class TableTypes(Enum):
17 """Enum for table names used in ORM"""
19 User = auto()
20 YaptideUser = auto()
21 KeycloakUser = auto()
22 Simulation = auto()
23 Cluster = auto()
24 Task = auto()
25 Result = auto()
26 Input = auto()
27 Estimator = auto()
28 Page = auto()
31def connect_to_db(verbose: int = 0) -> tuple[db.Connection, db.MetaData, db.Engine]:
32 """Connects to the db"""
33 db_uri = os.environ.get('FLASK_SQLALCHEMY_DATABASE_URI')
34 if verbose > 1:
35 click.echo(f'Connecting to URI: {db_uri}')
36 if not db_uri:
37 click.echo(f'Database URI: {db_uri} not set - aborting', err=True)
38 raise click.Abort()
39 echo: bool = verbose > 1
40 engine = db.create_engine(db_uri, echo=echo)
41 try:
42 con = engine.connect()
43 metadata = db.MetaData()
44 metadata.reflect(bind=engine)
45 except db.exc.OperationalError:
46 click.echo(f'Connection to db {db_uri} failed', err=True)
47 raise click.Abort()
48 return con, metadata, engine
51def user_exists(name: str, auth_provider: str, users: db.Table, con) -> bool:
52 """Check if user already exists"""
53 stmt = db.select(users).filter_by(username=name, auth_provider=auth_provider)
54 users_found = con.execute(stmt).all()
55 if len(users_found) > 0:
56 click.echo(f'User: {name} exists')
57 return True
58 return False
61@click.group()
62def run():
63 """Manage database"""
66@run.command
67@click.option('-v', '--verbose', count=True)
68def list_users(verbose):
69 """List users"""
70 con, metadata, _ = connect_to_db(verbose=verbose)
71 users = metadata.tables[TableTypes.User.name]
72 stmt = db.select(users.c.username, users.c.auth_provider)
73 all_users = con.execute(stmt).all()
75 click.echo(f"{len(all_users)} users in DB:")
76 for user in all_users:
77 click.echo(f"Login {user.username}; Auth provider {user.auth_provider}")
80@run.command
81@click.argument('name')
82@click.option('password', '--password', default='')
83@click.option('-v', '--verbose', count=True)
84def add_user(name, password, verbose):
85 """Add yaptide user to database"""
86 con, metadata, _ = connect_to_db(verbose=verbose)
87 click.echo(f'Adding user: {name}')
88 if verbose > 2:
89 click.echo(f'Password: {password}')
91 users = metadata.tables[TableTypes.User.name]
93 if user_exists(name=name, auth_provider="YaptideUser", users=users, con=con):
94 click.echo(f'YaptideUser: {name} already exists, aborting add')
95 raise click.Abort()
97 click.echo(f'YaptideUser: {name} does not exist, adding')
98 click.echo(f'Adding user: {name}')
99 query = db.insert(users).values(username=name, auth_provider="YaptideUser")
100 result = con.execute(query)
101 user_id = result.inserted_primary_key[0]
102 password_hash = generate_password_hash(password)
104 yaptide_users = metadata.tables[TableTypes.YaptideUser.name]
105 query = db.insert(yaptide_users).values(id=user_id, password_hash=password_hash)
106 con.execute(query)
107 con.commit()
110@run.command
111@click.argument('name')
112@click.option('password', '--password', default='')
113@click.option('-v', '--verbose', count=True)
114def update_user(name, password, verbose):
115 """Update user in database"""
116 con, metadata, _ = connect_to_db(verbose=verbose)
117 click.echo(f'Updating user: {name}')
118 users = metadata.tables[TableTypes.User.name]
120 if not user_exists(name, "YaptideUser", users, con):
121 click.echo(f'YaptideUser: {name} does not exist, aborting update')
122 raise click.Abort()
124 # update password
125 if password:
126 pwd_hash: str = generate_password_hash(password)
127 yaptide_users = metadata.tables[TableTypes.YaptideUser.name]
128 stmt = db.update(yaptide_users).\
129 where(yaptide_users.c.id == users.c.id).\
130 where(users.c.username == name).\
131 where(users.c.auth_provider == "YaptideUser").\
132 values(password_hash=pwd_hash)
133 con.execute(stmt)
134 con.commit()
135 if verbose > 2:
136 click.echo(f'Updating password: {password}')
137 click.echo(f'Successfully updated user: {name}')
140@run.command
141@click.argument('name')
142@click.argument('auth_provider')
143def remove_user(name, auth_provider):
144 """Delete user"""
145 con, metadata, _ = connect_to_db()
146 click.echo(f'Deleting user: {name}')
147 users = metadata.tables[TableTypes.User.name]
149 # abort if user does not exist
150 if not user_exists(name, auth_provider, users, con):
151 click.echo(f"Aborting, user {name} does not exist")
152 raise click.Abort()
154 query = db.delete(users).where(users.c.username == name, users.c.auth_provider == auth_provider)
155 con.execute(query)
156 con.commit()
157 click.echo(f'Successfully deleted user: {name}')
160@run.command
161@click.option('sim_id', '--sim_id')
162@click.option('user', '--user')
163@click.option('auth_provider', '--auth-provider')
164def list_tasks(user, auth_provider, sim_id):
165 """List tasks"""
166 con, metadata, _ = connect_to_db()
167 tasks = metadata.tables[TableTypes.Task.name]
168 users = metadata.tables[TableTypes.User.name]
169 simulations = metadata.tables[TableTypes.Simulation.name]
171 if user:
172 stmt = db.select(users).filter_by(username=user)
173 users_found = con.execute(stmt).all()
174 if not len(users_found) > 0:
175 click.echo(f"Aborting, user {user} does not exist")
176 raise click.Abort()
178 filter_args_user = {}
179 filter_args_simulation = {}
181 if user:
182 filter_args_user['username'] = user
183 if auth_provider:
184 filter_args_user['auth_provider'] = auth_provider
185 if sim_id:
186 filter_args_simulation['simulation_id'] = int(sim_id)
188 stmt = db.select(tasks.c.simulation_id, tasks.c.task_id, users.c.username,
189 tasks.c.task_state).select_from(tasks).filter_by(**filter_args_simulation).join(
190 simulations, tasks.c.simulation_id == simulations.c.id).join(
191 users, simulations.c.user_id == users.c.id).filter_by(**filter_args_user).order_by(
192 tasks.c.simulation_id, tasks.c.task_id)
193 all_tasks = con.execute(stmt).all()
195 click.echo(f"{len(all_tasks)} tasks in DB:")
196 for task in all_tasks:
197 simulation_id_col = f"Simulation id {task.simulation_id}"
198 task_id_col = f"Task id ...{task.task_id}"
199 task_state_col = f"task_state {task.task_state}"
200 user_col = f" username {task.username}" if not user else ''
202 click.echo('; '.join((simulation_id_col, task_id_col, task_state_col, user_col)))
205@run.command
206@click.argument('simulation_id')
207@click.argument('task_id')
208@click.option('-v', '--verbose', count=True)
209def remove_task(simulation_id, task_id, verbose):
210 """Delete task"""
211 con, metadata, _ = connect_to_db(verbose=verbose)
212 click.echo(f'Deleting task: {task_id} from simulation: {simulation_id}')
213 tasks = metadata.tables[TableTypes.Task.name]
214 simulation_id = int(simulation_id)
215 task_id = int(task_id)
217 stmt = db.select(tasks).filter_by(task_id=task_id, simulation_id=simulation_id)
218 task_found = con.execute(stmt).all()
219 if not task_found:
220 click.echo(f"Aborting, task {task_id} does not exist")
221 raise click.Abort()
223 query = db.delete(tasks).where(tasks.c.task_id == task_id)
224 con.execute(query)
225 con.commit()
226 click.echo(f'Successfully deleted task: {task_id}')
229@run.command
230@click.option('-v', '--verbose', count=True)
231@click.option('user', '--user')
232@click.option('auth_provider', '--auth-provider')
233def list_simulations(verbose, user, auth_provider):
234 """List simulations"""
235 con, metadata, _ = connect_to_db(verbose=verbose)
236 simulations = metadata.tables[TableTypes.Simulation.name]
237 users = metadata.tables[TableTypes.User.name]
239 if user:
240 stmt = db.select(users).filter_by(username=user)
241 users_found = con.execute(stmt).all()
242 if not len(users_found) > 0:
243 click.echo(f"Aborting, user {user} does not exist")
244 raise click.Abort()
246 filter_args = {}
248 if user:
249 filter_args['username'] = user
250 if auth_provider:
251 filter_args['auth_provider'] = auth_provider
253 stmt = db.select(simulations.c.id, simulations.c.job_id, simulations.c.start_time,
254 simulations.c.end_time, users.c.username).select_from(simulations).join(
255 users, simulations.c.user_id == users.c.id).filter_by(**filter_args)
256 sims = con.execute(stmt).all()
258 click.echo(f"{len(sims)} simulations in DB:")
259 user_column = ''
260 for sim in sims:
261 user_column = f" username {sim.username}" if not user else ''
262 click.echo(
263 f"id {sim.id}; job id {sim.job_id}; start_time {sim.start_time}; end_time {sim.end_time};{user_column}")
266@run.command
267@click.argument('simulation_id')
268@click.option('-v', '--verbose', count=True)
269def remove_simulation(simulation_id, verbose):
270 """Delete simulation"""
271 simulation_id = int(simulation_id)
272 con, metadata, _ = connect_to_db(verbose=verbose)
273 click.echo(f'Deleting simulation: {simulation_id}')
274 simulations = metadata.tables[TableTypes.Simulation.name]
276 stmt = db.select(simulations).filter_by(id=simulation_id)
277 simulation_found = con.execute(stmt).all()
278 if not simulation_found:
279 click.echo(f"Aborting, simulation {simulation_id} does not exist")
280 raise click.Abort()
282 query = db.delete(simulations).where(simulations.c.id == simulation_id)
284 con.execute(query)
285 con.commit()
286 click.echo(f'Successfully deleted simulation: {simulation_id}')
289@run.command
290@click.argument('cluster_name')
291@click.option('-v', '--verbose', count=True)
292def add_cluster(cluster_name, verbose):
293 """Adds cluster with provided name to database if it does not exist"""
294 con, metadata, _ = connect_to_db(verbose=verbose)
295 cluster_table = metadata.tables[TableTypes.Cluster.name]
297 # check if cluster already exists
298 stmt = db.select(cluster_table).filter_by(cluster_name=cluster_name)
299 clusters = con.execute(stmt).all()
300 if len(clusters) > 0:
301 click.echo(f"Cluster {cluster_name} already exists in DB")
302 raise click.Abort()
304 stmt = db.insert(cluster_table).values(cluster_name=cluster_name)
305 con.execute(stmt)
306 con.commit()
307 click.echo(f"Cluster {cluster_name} added to DB")
310@run.command
311def list_clusters():
312 """List clusters"""
313 con, metadata, _ = connect_to_db()
314 stmt = db.select(metadata.tables[TableTypes.Cluster.name])
315 clusters = con.execute(stmt).all()
317 click.echo(f"{len(clusters)} clusters in DB:")
318 for cluster in clusters:
319 click.echo(f"id {cluster.id}; cluster name {cluster.cluster_name};")
322if __name__ == "__main__":
323 run()