Creating new tables and copying all fields from another table using the API

I am using your API create_database_table but it’s not letting me create a table without any fields. I want to create a new table and then copy all the fields of an existing source table into this new table.

I tried with list_database_table_fields but first, I need an empty table to copy from source to destination table.

Do you have any other recommendations to achieve this using your API? I need to copy all the fields of one table into a freshly created table in a fresh database created through the API.

Thanks.

Hi @computersrmyfriends, unfortunately, it’s not possible to create a table that’s completely empty. The bare minimum table should always have a primary field. If you want to create a table without any of the auto-create fields, you can make an API request that looks like this:

POST https://staging.baserow.io/api/database/tables/database/{DATABASE_ID}/

{"name":"Table","data":[["Field"]],"first_row_header":true}

It will only have a primary text field named “Field”, without any other fields or rows.

Alternatively, you could use the duplicate feature. If you create a table that has all the fields that must, but you make sure it doesn’t have any rows, you can easily duplicate the table.

This can also be done via the API using this endpoint: Baserow API spec

Thank you for your response Bram.

What you’re showing in the video is exactly what I am trying to do , but from one source db to a destination db through API.

Is it possible to copy a table as it is from the source database to a destinationdb through API without modification?

This is unfortunately not yet possible. There is an issue (Migrate / move table into another database (#977) · Issues · Bram Wiepjes / baserow · GitLab) on the backlog that introduces moving a table into another database. Once that is finished, you should be able to duplicate a move to another database.

Thanks Bram. I ended up making it using your API in python.

import requests


class BaserowAPI:
   def __init__(self, username, password, server_url):
        self.username = username
        self.password = password
        self.server_url = server_url

   def generate_token(self):
        # Make the request to obtain the JWT token.
        token_request_url = f'{self.server_url}/api/user/token-auth/'
        
        response = requests.post(token_request_url, json={'email': self.username, 'password': self.password})
        print(response.text)
        response.raise_for_status()
        token = response.json()['token']
        return token




def get_database_table(table_id, jwt_token):
    api_url = f"{serverlink}/api/database/tables/{table_id}/"

    headers = {
        "Authorization": f"JWT {jwt_token}",
        "Content-Type": "application/json",
    }

    response = requests.get(api_url, headers=headers)

    if response.status_code == 200:
        return response.json()
    elif response.status_code in [400, 404]:
        response_data = response.json()
        raise ValueError(response_data["error"], response_data["detail"])
    else:
        response.raise_for_status()

def create_database_table(auth_token, database_id, name, data=None, first_row_header=False):
        # Define the API endpoint URL.
        url = f"{serverlink}/api/database/tables/database/{database_id}/"

        # Define the authorization token for the user.
#        auth_token = self.generate_token()

        # Define the headers for the HTTP request, including the authorization token.
        headers = {
            "Authorization": f"JWT {auth_token}",
            "Content-Type": "application/json",
        }

        # Define the payload for the HTTP request to create a new table.
        payload = {
            "name": name,
            "data": data,
            "first_row_header": first_row_header,
        }

        # Send the HTTP request to create a new table.
        response = requests.post(url, headers=headers, json=payload)

        # Parse the response JSON to extract the newly created table.
        if response.status_code == 200:
            new_table = response.json()
            return new_table['id']
            #print(f"New table created with ID {new_table['id']}")
        else:
            #print(response.text)
            return response.status_code
            #print(f"Request failed with status code {response.status_code}")


def create_baserow_database(server_url, workspace_id, database_name, token):
    # Define the API endpoint URL and workspace ID.
    url = f"{server_url}/api/applications/workspace/{workspace_id}/"

    # Define the authorization token for the user.
    auth_token = token

    # Define the headers for the HTTP request, including the authorization token.
    headers = {
        "Authorization": f"JWT {auth_token}",
        "Content-Type": "application/json",
    }

    # Define the payload for the HTTP request to create a new database application.
    payload = {
        "name": database_name,
        "type": "database",
        "init_with_data": False,
    }

    # Send the HTTP request to create a new database application.
    response = requests.post(url, headers=headers, json=payload)

    # Parse the response JSON to extract the newly created application.
    if response.status_code == 200:
        new_application = response.json()
        return new_application['id']
#        return f"New database '{database_name}' created with ID {new_application['id']}"
    else:
        return f"Request failed with status code {response.status_code}"



def update_database_table_field(field_id, server_url, auth_token, new_field_data):
    # Define the API endpoint URL.
    url = f"{server_url}/api/database/fields/{field_id}/"

    # Define the headers for the HTTP request, including the authorization token.
    headers = {
        "Authorization": f"JWT {auth_token}",
        "Content-Type": "application/json",
    }

    # Send the HTTP request to update the field of the table.
    response = requests.patch(url, headers=headers, json=new_field_data)

    # Check if the request was successful and print the updated field data.
    if response.status_code == 200:
        updated_field = response.json()
        print(f"Updated field ID {field_id}:")
    else:
        print(f"Request failed with status code {response.status_code}")

def get_first_field(table_id, server_url, auth_token):
    # Define the API endpoint URL.
    url = f"{server_url}/api/database/fields/table/{table_id}/"

    # Define the headers for the HTTP request, including the authorization token.
    headers = {
        "Authorization": f"JWT {auth_token}",
        "Content-Type": "application/json",
    }

    # Send the HTTP request to list the fields of the table.
    response = requests.get(url, headers=headers)

    # Parse the response JSON to extract the table fields.
    if response.status_code == 200:
        fields = response.json()
        for field in fields:
         return fields[0] if fields else None
    else:
        print(f"Request failed with status code {response.status_code}")
        return None


def copy_table_fields_skip_first(table_id,dest_table_id,auth_token,skip_n_fields=1,order_increment=0):
    fields_dict = {}
    # Define the API endpoint URL.
    server_url = serverlink
    url = f"{server_url}/api/database/fields/table/{table_id}/"

    # Define the headers for the HTTP request, including the authorization token.
    headers = {
        "Authorization": f"JWT {auth_token}",
        "Content-Type": "application/json",
    }

    # Send the HTTP request to list the fields of the table.
    response = requests.get(url, headers=headers)

    # Parse the response JSON to extract the table fields.
    if response.status_code == 200:
        fields = response.json()
        print(f"Fields for table ID {table_id}:")
        for field in fields[skip_n_fields:]:
            new_field = {k: v for k, v in field.items() if k not in ["id", "table_id", "read_only"]}
            new_field['order']=int(field['order']) + order_increment
            field_name, field_id = create_field_in_table(auth_token, dest_table_id, new_field)
            if field_name is not None and field_id is not None:
                     fields_dict[field_name] = field_id
    else:
        print(f"Request failed with status code {response.status_code}")
    return fields_dict


def create_field_in_table(jwt_token, table_id, field_json):
    base_url = serverlink
    headers = {"Authorization": f"JWT {jwt_token}"}

    response = requests.post(
        f"{base_url}/api/database/fields/table/{table_id}/",
        headers=headers,
        json=field_json,
    )

    if response.status_code == 200:
        response_data = response.json()
        field_name = response_data["name"]
        field_id = response_data["id"]
        print(f"Field '{field_name}' with Field ID '{field_id}' created successfully in the destination table")
        return field_name, field_id
    else:
        print(f"Failed to create field '{field_json['name']}' in the destination table: {response.status_code} - {response.text}")
        return None, None


def workspace_get_application(application_id, jwt_token):
    api_url = f"{serverlink}/api/applications/{application_id}/"

    headers = {
        "Authorization": f"JWT {jwt_token}",
        "Content-Type": "application/json",
    }

    response = requests.get(api_url, headers=headers)

    if response.status_code == 200:
        return response.json()
    elif response.status_code in [400, 404]:
        response_data = response.json()
        raise ValueError(response_data["error"], response_data["detail"])
    else:
        response.raise_for_status()


def list_database_tables(token, database_id):
    url = f"{serverlink}/api/database/tables/database/{database_id}/"

    headers = {
        "Authorization": f"JWT {token}",
        "Content-Type": "application/json",
    }

    response = requests.get(url, headers=headers)

    if response.status_code == 200:
        return response.json()
    elif response.status_code == 400:
        raise ValueError("Bad request")
    elif response.status_code == 404:
        raise ValueError("Not found")
    else:
        response.raise_for_status()

serverlink="http://api.baserow.com"

api = BaserowAPI(username='demo@admin.com', password='baserowpwd', server_url=serverlink)


# Generate the JWT token.
strtoken = api.generate_token()



def copytable(source_table_id,dest_database_id):
    source_table_name=get_database_table(source_table_id,strtoken)['name']
    dest_table_id = create_database_table(strtoken,dest_database_id,source_table_name, data=[[""],[""]], first_row_header=False)
    first_field_source=get_first_field(source_table_id,serverlink,strtoken)

    first_field_dest = get_first_field(dest_table_id, serverlink, strtoken)
    first_field_dest_id = first_field_dest['id']
    update_database_table_field(first_field_dest_id, serverlink, strtoken,first_field_source)
    copy_table_fields_skip_first(source_table_id,dest_table_id,strtoken)
 


def copy_database(strtoken,source_db_id,dest_workspace_id):
    dest_db_name=workspace_get_application(source_db_id,strtoken)['name']
    dest_db_id = create_baserow_database(serverlink,dest_workspace_id,dest_db_name,strtoken)
    for eachtable in list_database_tables(strtoken,source_db_id):
       copytable(eachtable['id'],dest_db_id)
    return dest_db_id

However it only works if the table does not have links. As for links, I think it’ll go slightly more deeper to check all those linked ids and replace them dynamically.

Also, I haven’t done the copying rows part since I only needed the table structure.