Batch upload from BigQuery to Aito with 20 lines of Python code

There is a better way

Refuel your ML model with new data on the fly, photo from Defense News.

Segment.com and BigQuery

Prerequisites

  • You need to have your Aito instance. Free Sandboxes are available, sign up.
  • Have source data that you want to push into Aito and use for predictions, preferably in BigQuery or in some other database. If you just want to play around, the world is full of free datasets. If you have your own Google Analytics account, check for instructions to get data into BigQuery. If not, maybe check the Google Analytics sample data, but remember that it will not update continuously, so you need to fake it to test the daily batch uploads.
  • Have necessary components installed for Python, connecting to your database and of course Aito: pip install aitoai.
  • Have you already created the schema in Aito for your data? You’ll need to do this as well. Essentially, you can just drop an example file in Aito Console and watch the schema autocreate, or DIY with help.

Can we achieve our goal with 20 lines of code?

  1. Fetch new entries (based on previous max timestamp in Aito) from BigQuery to a dataframe
  2. Upload dataframe to Aito
  3. Keep only a recent snapshot of data in Aito. We clean data points that are older than X days
  4. EXTRA: Run evaluation to track prediction accuracy, and write result in a file for monitoring purposes
{
"type": "table",
"columns": {
"anonymous_id": {
"link": "aito_segment_user.id",
"nullable": False,
"type": "String"
},
"context_campaign_medium": {
"nullable": True,
"type": "String"
},
"context_campaign_name": {
"nullable": True,
"type": "String"
},
"context_campaign_source": {
"nullable": True,
"type": "String"
},
"context_ip": {
"nullable": False,
"type": "String"
},
"context_locale": {
"nullable": False,
"type": "String"
},
"context_page_path": {
"nullable": False,
"type": "String"
},
"context_page_referrer": {
"nullable": True,
"type": "String"
},
"context_page_title": {
"nullable": True,
"type": "String"
},
"context_user_agent": {
"nullable": False,
"type": "String"
},
"id": {
"nullable": False,
"type": "String"
},
"unix_timestamp": {
"nullable": False,
"type": "Int"
},
"referrer": {
"nullable": True,
"type": "String"
},
"search": {
"nullable": True,
"type": "Text",
"analyzer": "en"
},
"url": {
"nullable": False,
"type": "String"
}
}
}

Finally there is some code!

import pandas as pd
import pandas_gbq
from google.oauth2 import service_account
from aito.sdk.aito_client import AitoClient
pandas_gbq.context.credentials = service_account.Credentials.from_service_account_file('YOUR-OWN-FILE.json')
pandas_gbq.context.project = 'YOUR-OWN-GCP-PROJECT'
aito = AitoClient("http://YOUR-OWN-AITO-URL.api.aito.ai", "YOUR-OWN-AITO-API-KEY")

Get data from BigQuery

body = {
"from": "aito_segment_web",
"orderBy": { "$desc": "unix_timestamp" },
"limit": 1,
"select": ["unix_timestamp"]
}
batch_start_unixtime = aito.request(
method = 'POST',
endpoint = '/api/v1/_search',
query = body
)['hits'][0]['unix_timestamp']
sql = """
SELECT
anonymous_id,
context_campaign_medium,
context_campaign_name,
context_campaign_source,
context_ip,
context_locale,
context_page_path,
context_page_referrer,
context_page_title,
context_user_agent,
id,
UNIX_SECONDS(timestamp) as unix_timestamp,
referrer,
search,
url
FROM aito_ai_website.pages
WHERE
UNIX_SECONDS(timestamp) > {starttime};
""".format(starttime=str(batch_start_unixtime))
df = pandas_gbq.read_gbq(sql)

Upload to Aito

aito.upload_entries_by_batches(
"aito_segment_web",
df.to_dict(orient="records"))

Janitorial duties

cutoff_unix_sec = int(time.time()) - (90 * 24 * 60 * 60) # days * hours * minutes * secondsdel_body = {
"from": "aito_segment_web",
"where": { "unix_timestamp" : { "$lt" : cutoff_unix_sec } }
}
aito.request(
method = 'POST',
endpoint = '/api/v1/data/_delete',
query = del_body
)

Evaluate accuracy with the new dataset

  • Define an evaluate query body.
  • Send it to Aito as a job, as evaluates take usually more time than our normal query timeout of 30 secs.
  • After getting the results, you would append the vital statistics like size of sthe ample, error rate, accuracy etc to a file for later examination.

Summary

--

--

--

20+ years of SW and tech leadership. Two startup exits. Building next-gen machine learning tools for no-coders and RPA devs at Aito.ai. Used to travel a lot

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Internship: Weeks 7 & 8

Top 7 Compelling Characteristics of Java Development

Single Node Red Hat Openstack Deployment

CS373 Spring 2022: Jonathan Li

Fast way to load device contacts in android programming

Datadog vs Grafana vs AWS CloudWatch

Event-Driven Architecture, a brief introduction

Migrating Helm Charts v2 to v3

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Tommi Holmgren

Tommi Holmgren

20+ years of SW and tech leadership. Two startup exits. Building next-gen machine learning tools for no-coders and RPA devs at Aito.ai. Used to travel a lot

More from Medium

Running Airflow on Heroku (Part II)

How to Build Powerful Airflow DAGs for Big Data Workflows in Python

Faster Data Loading for Pandas on S3

Data Overload Vol. 8