Keyword cannibalization detection SEO tool with Python

How To Detect Keyword Cannibalization On Website Using Python

In this post, we’re going to make a Python script for detecting keyword cannibalization on a website. Furthermore, we’ll achieve this by utilizing the Google Search Console API. Therefore, we need to register our website on Google Search Console first, before we can audit it.

Main requirement for this project is to authenticate the connection with this API. However, it’s a little bit of a process, so bear with me here. Before we start coding, I’m going to guide you step by step to get this thing working.

Setup

Okay, so the first thing you’ll need to take care of is create a Google Cloud project and enable the Google Search Console API. You can also find it under the Library tab searching for it in the search bar.

Next, you’ll need to setup a service account under Credentials tab, by clicking on the CREATE CREDENTIALS button and selecting Service account. From here, you’ll need to input its name, which will also create its ID. After you’ve chosen a name for it, you can just click Done button.

However, we’re not completely finished setting it up here. Once you create it, click on it to open its settings. You should see the following tabs: Details, Permissions, Keys, Metrics and Logs. Here you should navigate to the Keys tab and create a new key and select JSON option. This will download a JSON file containing your credentials.

For the final part of this setup, you need to log into Google Search Console dashboard and navigate to the Settings and Users and permissions. Here you’ll need to add a user and enter the email address (ID) of the service account. You can also set either full or restricted permissions for it.

You also need to copy the credentials JSON file into the project folder and rename it into credentials.json, since we’ll use it to authenticate our connection within Python script.

Coding keyword cannibalization detector script

Alright, now we can start working on our Python code. First of all, like with any other Python project, we need to import all the necessary modules.

import os
import argparse
import pandas as pd
from datetime import date
from dateutil.relativedelta import relativedelta
from google.oauth2 import service_account
from googleapiclient.discovery import build

Next, we’ll set a few constants and settings, which we’ll use later on.

ROOT = os.path.dirname(__file__)

API_SERVICE_NAME = 'webmasters'
API_VERSION = 'v3'
SCOPE = [
    'https://www.googleapis.com/auth/webmasters.readonly'
]

pd.set_option('max_colwidth', 100)

Now, in order for us to be able to access Google Search Console API, we need to create a service object. Moreover, we’ll handle this within a method, which will take a file path of credentials.json file as an argument.

def auth_service(credentials_path):

    credentials = service_account.Credentials.from_service_account_file(
        credentials_path,
        scopes=SCOPE
    )

    service = build(API_SERVICE_NAME, API_VERSION, credentials=credentials)

    return service

We’ll also create another method, which we’ll use to actually get the data from the API.

def query(service, url, payload):
    response = service.searchanalytics().query(siteUrl=url, body=payload).execute()

    results = []

    for row in response['rows']:
        data = {}

        for i in range(len(payload['dimensions'])):
            data[payload['dimensions'][i]] = row['keys'][i]

        data['clicks'] = row['clicks']
        data['impressions'] = row['impressions']
        data['ctr'] = round(row['ctr'] * 100, 2)
        data['position'] = round(row['position'], 2)

        results.append(data)
    
    return pd.DataFrame.from_dict(results)

Now, we’re ready to add it all together in the main thread, where we’re also going to define an argument parser. The purpose of this is, so we’ll be able to use the script directly from the command prompt.

We’ll add arguments for custom URL address, start and end date of the audited period and an argument for specific query.

NOTE: this will only work for URL addresses of websites that have the service account added as a user in your Google Search Console.

parser = argparse.ArgumentParser(
    description='Check website for keyword cannibalization.'
)

parser.add_argument(
    '-u',
    '--url',
    type=str,
    required=False,
    default=service.sites().list().execute()['siteEntry'][0]['siteUrl'],
    help='URL address of website to audit'
)

parser.add_argument(
    '-s',
    '--startDate',
    type=str,
    required=False,
    default=(date.today() - relativedelta(months=3)).strftime('%Y-%m-%d'),
    help='Start date of the audited period'
)

parser.add_argument(
    '-e',
    '--endDate',
    type=str,
    required=False,
    default=date.today().strftime('%Y-%m-%d'),
    help='End date of the audited period'
)

parser.add_argument(
    '-q',
    '--query',
    type=str,
    required=False,
    help='Specific query to analyze'
)

args = parser.parse_args()

Next, we’ll create a payload dictionary, within which we’ll pass the date arguments. This will allow us to fetch data for a custom time period as long as we format the date string correctly.

payload = {
    'startDate': args.startDate,
    'endDate': args.endDate,
    'dimensions': ['query', 'page'],
    'rowLimit': 10000,
    'startRow': 0
}

Now, we can get all the data for that time period.

df = query(service, args.url, payload)
df.to_csv(os.path.join(ROOT, 'all.csv'), index=False)
print(df.head())

With the following snippet, we can also examine the overall data.

data = {
    'Total pages': [int(df['page'].nunique())],
    'Total queries': [int(df['query'].nunique())],
    'Total clicks': [int(df['clicks'].sum())],
    'Total impressions': [int(df['impressions'].sum())],
    'Average CTR': [round(df['ctr'].mean(), 2)],
    'Average position': [round(df['position'].mean(), 2)],
    'Average queries per page': [round(int(df['query'].nunique()) / int(df['page'].nunique()), 2)]
}

df_stats = pd.DataFrame.from_dict(data)
print(df_stats.T.head(10))

And for the most important part of this guide, we’ll create a dataframe containing information for each query and how many pages rank for the same keyword.

df_cannibalized = df.groupby('query').agg(
    unique_pages=('page', 'nunique'),
    total_clicks=('clicks', 'sum'),
    total_impressions=('impressions', 'sum'),
    avg_ctr=('ctr', 'mean'),
    avg_position=('position', 'mean')
).sort_values(by='unique_pages', ascending=False)

df_cannibalized = df_cannibalized[df_cannibalized['unique_pages'] > 1]
df_cannibalized.to_csv(os.path.join(ROOT, 'cannibalized.csv'))
print(df_cannibalized)

We can also dig deeper and find out exactly which are these pages by searching for specific query.

if args.query:
    print(df[df['query']==args.query].sort_values(by='impressions', ascending=False))

That’s it! Now you have a fully functioning script for detecting keyword cannibalization on your website.

Entire keyword cannibalization project code

I’m also including a link to the GitHub repository, where you can check out the project.

import os
import argparse
import pandas as pd
from datetime import date
from dateutil.relativedelta import relativedelta
from google.oauth2 import service_account
from googleapiclient.discovery import build

ROOT = os.path.dirname(__file__)

API_SERVICE_NAME = 'webmasters'
API_VERSION = 'v3'
SCOPE = [
    'https://www.googleapis.com/auth/webmasters.readonly'
]

pd.set_option('max_colwidth', 100)

def auth_service(credentials_path):

    credentials = service_account.Credentials.from_service_account_file(
        credentials_path,
        scopes=SCOPE
    )

    service = build(API_SERVICE_NAME, API_VERSION, credentials=credentials)

    return service

def query(service, url, payload):
    response = service.searchanalytics().query(siteUrl=url, body=payload).execute()

    results = []

    for row in response['rows']:
        data = {}

        for i in range(len(payload['dimensions'])):
            data[payload['dimensions'][i]] = row['keys'][i]

        data['clicks'] = row['clicks']
        data['impressions'] = row['impressions']
        data['ctr'] = round(row['ctr'] * 100, 2)
        data['position'] = round(row['position'], 2)

        results.append(data)
    
    return pd.DataFrame.from_dict(results)


if __name__ == '__main__':

    service = auth_service(os.path.join(ROOT, 'credentials.json'))

    parser = argparse.ArgumentParser(
        description='Check website for keyword cannibalization.'
    )

    parser.add_argument(
        '-u',
        '--url',
        type=str,
        required=False,
        default=service.sites().list().execute()['siteEntry'][0]['siteUrl'],
        help='URL address of website to audit'
    )

    parser.add_argument(
        '-s',
        '--startDate',
        type=str,
        required=False,
        default=(date.today() - relativedelta(months=3)).strftime('%Y-%m-%d'),
        help='Start date of the audited period'
    )

    parser.add_argument(
        '-e',
        '--endDate',
        type=str,
        required=False,
        default=date.today().strftime('%Y-%m-%d'),
        help='End date of the audited period'
    )

    parser.add_argument(
        '-q',
        '--query',
        type=str,
        required=False,
        help='Specific query to analyze'
    )

    args = parser.parse_args()

    payload = {
        'startDate': args.startDate,
        'endDate': args.endDate,
        'dimensions': ['query', 'page'],
        'rowLimit': 10000,
        'startRow': 0
    }

    df = query(service, args.url, payload)
    df.to_csv(os.path.join(ROOT, 'all.csv'), index=False)
    print(df.head())

    data = {
        'Total pages': [int(df['page'].nunique())],
        'Total queries': [int(df['query'].nunique())],
        'Total clicks': [int(df['clicks'].sum())],
        'Total impressions': [int(df['impressions'].sum())],
        'Average CTR': [round(df['ctr'].mean(), 2)],
        'Average position': [round(df['position'].mean(), 2)],
        'Average queries per page': [round(int(df['query'].nunique()) / int(df['page'].nunique()), 2)]
    }

    df_stats = pd.DataFrame.from_dict(data)
    print(df_stats.T.head(10))

    df_summary = df.groupby('query').agg(
        unique_pages=('page', 'nunique'),
        total_clicks=('clicks', 'sum'),
        total_impressions=('impressions', 'sum'),
        avg_ctr=('ctr', 'mean'),
        avg_position=('position', 'mean')
    ).sort_values(by='total_clicks', ascending=False)
    print(df_summary.head(10))

    df_cannibalized = df.groupby('query').agg(
        unique_pages=('page', 'nunique'),
        total_clicks=('clicks', 'sum'),
        total_impressions=('impressions', 'sum'),
        avg_ctr=('ctr', 'mean'),
        avg_position=('position', 'mean')
    ).sort_values(by='unique_pages', ascending=False)

    df_cannibalized = df_cannibalized[df_cannibalized['unique_pages'] > 1]
    df_cannibalized.to_csv(os.path.join(ROOT, 'cannibalized.csv'))
    print(df_cannibalized)

    if args.query:
        print(df[df['query']==args.query].sort_values(by='impressions', ascending=False))

And you can use the following command structure if you want to use it in command prompt.

python main.py -u example.com -s 2023-01-01 -e 2023-12-25 -q "example keyword"

Conclusion

To conclude, we made a Python script for detecting keyword cannibalization on a website that we track on Google Search Console. I learned a lot while working on this project and I hope you will find it helpful as well.