Thanks Bram. I ended up making it using your API in python.
import requests
class BaserowAPI:
def __init__(self, username, password, server_url):
self.username = username
self.password = password
self.server_url = server_url
def generate_token(self):
# Make the request to obtain the JWT token.
token_request_url = f'{self.server_url}/api/user/token-auth/'
response = requests.post(token_request_url, json={'email': self.username, 'password': self.password})
print(response.text)
response.raise_for_status()
token = response.json()['token']
return token
def get_database_table(table_id, jwt_token):
api_url = f"{serverlink}/api/database/tables/{table_id}/"
headers = {
"Authorization": f"JWT {jwt_token}",
"Content-Type": "application/json",
}
response = requests.get(api_url, headers=headers)
if response.status_code == 200:
return response.json()
elif response.status_code in [400, 404]:
response_data = response.json()
raise ValueError(response_data["error"], response_data["detail"])
else:
response.raise_for_status()
def create_database_table(auth_token, database_id, name, data=None, first_row_header=False):
# Define the API endpoint URL.
url = f"{serverlink}/api/database/tables/database/{database_id}/"
# Define the authorization token for the user.
# auth_token = self.generate_token()
# Define the headers for the HTTP request, including the authorization token.
headers = {
"Authorization": f"JWT {auth_token}",
"Content-Type": "application/json",
}
# Define the payload for the HTTP request to create a new table.
payload = {
"name": name,
"data": data,
"first_row_header": first_row_header,
}
# Send the HTTP request to create a new table.
response = requests.post(url, headers=headers, json=payload)
# Parse the response JSON to extract the newly created table.
if response.status_code == 200:
new_table = response.json()
return new_table['id']
#print(f"New table created with ID {new_table['id']}")
else:
#print(response.text)
return response.status_code
#print(f"Request failed with status code {response.status_code}")
def create_baserow_database(server_url, workspace_id, database_name, token):
# Define the API endpoint URL and workspace ID.
url = f"{server_url}/api/applications/workspace/{workspace_id}/"
# Define the authorization token for the user.
auth_token = token
# Define the headers for the HTTP request, including the authorization token.
headers = {
"Authorization": f"JWT {auth_token}",
"Content-Type": "application/json",
}
# Define the payload for the HTTP request to create a new database application.
payload = {
"name": database_name,
"type": "database",
"init_with_data": False,
}
# Send the HTTP request to create a new database application.
response = requests.post(url, headers=headers, json=payload)
# Parse the response JSON to extract the newly created application.
if response.status_code == 200:
new_application = response.json()
return new_application['id']
# return f"New database '{database_name}' created with ID {new_application['id']}"
else:
return f"Request failed with status code {response.status_code}"
def update_database_table_field(field_id, server_url, auth_token, new_field_data):
# Define the API endpoint URL.
url = f"{server_url}/api/database/fields/{field_id}/"
# Define the headers for the HTTP request, including the authorization token.
headers = {
"Authorization": f"JWT {auth_token}",
"Content-Type": "application/json",
}
# Send the HTTP request to update the field of the table.
response = requests.patch(url, headers=headers, json=new_field_data)
# Check if the request was successful and print the updated field data.
if response.status_code == 200:
updated_field = response.json()
print(f"Updated field ID {field_id}:")
else:
print(f"Request failed with status code {response.status_code}")
def get_first_field(table_id, server_url, auth_token):
# Define the API endpoint URL.
url = f"{server_url}/api/database/fields/table/{table_id}/"
# Define the headers for the HTTP request, including the authorization token.
headers = {
"Authorization": f"JWT {auth_token}",
"Content-Type": "application/json",
}
# Send the HTTP request to list the fields of the table.
response = requests.get(url, headers=headers)
# Parse the response JSON to extract the table fields.
if response.status_code == 200:
fields = response.json()
for field in fields:
return fields[0] if fields else None
else:
print(f"Request failed with status code {response.status_code}")
return None
def copy_table_fields_skip_first(table_id,dest_table_id,auth_token,skip_n_fields=1,order_increment=0):
fields_dict = {}
# Define the API endpoint URL.
server_url = serverlink
url = f"{server_url}/api/database/fields/table/{table_id}/"
# Define the headers for the HTTP request, including the authorization token.
headers = {
"Authorization": f"JWT {auth_token}",
"Content-Type": "application/json",
}
# Send the HTTP request to list the fields of the table.
response = requests.get(url, headers=headers)
# Parse the response JSON to extract the table fields.
if response.status_code == 200:
fields = response.json()
print(f"Fields for table ID {table_id}:")
for field in fields[skip_n_fields:]:
new_field = {k: v for k, v in field.items() if k not in ["id", "table_id", "read_only"]}
new_field['order']=int(field['order']) + order_increment
field_name, field_id = create_field_in_table(auth_token, dest_table_id, new_field)
if field_name is not None and field_id is not None:
fields_dict[field_name] = field_id
else:
print(f"Request failed with status code {response.status_code}")
return fields_dict
def create_field_in_table(jwt_token, table_id, field_json):
base_url = serverlink
headers = {"Authorization": f"JWT {jwt_token}"}
response = requests.post(
f"{base_url}/api/database/fields/table/{table_id}/",
headers=headers,
json=field_json,
)
if response.status_code == 200:
response_data = response.json()
field_name = response_data["name"]
field_id = response_data["id"]
print(f"Field '{field_name}' with Field ID '{field_id}' created successfully in the destination table")
return field_name, field_id
else:
print(f"Failed to create field '{field_json['name']}' in the destination table: {response.status_code} - {response.text}")
return None, None
def workspace_get_application(application_id, jwt_token):
api_url = f"{serverlink}/api/applications/{application_id}/"
headers = {
"Authorization": f"JWT {jwt_token}",
"Content-Type": "application/json",
}
response = requests.get(api_url, headers=headers)
if response.status_code == 200:
return response.json()
elif response.status_code in [400, 404]:
response_data = response.json()
raise ValueError(response_data["error"], response_data["detail"])
else:
response.raise_for_status()
def list_database_tables(token, database_id):
url = f"{serverlink}/api/database/tables/database/{database_id}/"
headers = {
"Authorization": f"JWT {token}",
"Content-Type": "application/json",
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.json()
elif response.status_code == 400:
raise ValueError("Bad request")
elif response.status_code == 404:
raise ValueError("Not found")
else:
response.raise_for_status()
serverlink="http://api.baserow.com"
api = BaserowAPI(username='demo@admin.com', password='baserowpwd', server_url=serverlink)
# Generate the JWT token.
strtoken = api.generate_token()
def copytable(source_table_id,dest_database_id):
source_table_name=get_database_table(source_table_id,strtoken)['name']
dest_table_id = create_database_table(strtoken,dest_database_id,source_table_name, data=[[""],[""]], first_row_header=False)
first_field_source=get_first_field(source_table_id,serverlink,strtoken)
first_field_dest = get_first_field(dest_table_id, serverlink, strtoken)
first_field_dest_id = first_field_dest['id']
update_database_table_field(first_field_dest_id, serverlink, strtoken,first_field_source)
copy_table_fields_skip_first(source_table_id,dest_table_id,strtoken)
def copy_database(strtoken,source_db_id,dest_workspace_id):
dest_db_name=workspace_get_application(source_db_id,strtoken)['name']
dest_db_id = create_baserow_database(serverlink,dest_workspace_id,dest_db_name,strtoken)
for eachtable in list_database_tables(strtoken,source_db_id):
copytable(eachtable['id'],dest_db_id)
return dest_db_id
However it only works if the table does not have links. As for links, I think it’ll go slightly more deeper to check all those linked ids and replace them dynamically.
Also, I haven’t done the copying rows part since I only needed the table structure.