API Docs Get API key
Developer Tutorial

Bulk LinkedIn Enrichment

Turn a thin spreadsheet of names into thousands of complete, CRM-ready records — email, phone, LinkedIn, and firmographics — through one REST endpoint. Every call is 1 credit, with no per-result multipliers.

Enriching one lead is easy. Enriching fifty thousand is a different problem: now you care about throughput, rate limits, retries, resumability, and — above all — not setting fire to your credit balance on rows you didn't need. Bulk enrichment is less about the lookup and more about the loop around it.

This tutorial builds a production-grade bulk pipeline against the LinkFinder AI API end to end: cleaning and de-duplicating your input, enriching a CSV row by row, layering in safe concurrency that respects your plan's rate limit, making the whole job resumable so a crash doesn't cost you, controlling spend, and pushing the finished records into your CRM. Everything runs against a single endpoint — POST https://api.linkfinderai.com — and one request costs one credit, including lookups that come back empty, so cost stays easy to forecast.

Examples are in Python and Node.js. The free tier's 100 credits is enough to test the full flow on a small list before you scale it up.

1

Prepare and de-duplicate your list

This is the single highest-leverage step, and most people skip it. Because every lookup costs a credit — even one that returns nothing — a messy input list is money straight down the drain. Clean before you spend.

  • De-duplicate on a stable key (LinkedIn URL if you have it, otherwise normalized name + company). Duplicates are pure waste.
  • Normalize company names and trim whitespace so "CloudCore " and "cloudcore" collapse to one row.
  • Drop rows you can't act on — no name and no identifier means no match, just a wasted credit.
  • Skip what you already have. Don't re-enrich rows your CRM already filled in last month.
import csv

def normalize(s):
    return (s or "").strip().lower()

seen, clean_rows = set(), []
with open("leads_raw.csv", newline="") as f:
    for row in csv.DictReader(f):
        name, company = row.get("full_name", ""), row.get("company", "")
        if not name.strip():
            continue                         # nothing to match on
        key = (normalize(name), normalize(company))
        if key in seen:
            continue                         # duplicate -> skip
        seen.add(key)
        clean_rows.append({"full_name": name.strip(), "company": company.strip()})

with open("leads.csv", "w", newline="") as f:
    w = csv.DictWriter(f, fieldnames=["full_name", "company"])
    w.writeheader(); w.writerows(clean_rows)

print(f"Kept {len(clean_rows)} unique leads.")
On a list of 50,000 raw rows, it's common to find 20–30% duplicates or dead rows. Cleaning first can cut your bill by thousands of credits before you make a single call.
2

Set up the API and a single enrichment

Grab your API key from Settings → API Key and store it as an environment variable — never hardcode it. Every request is a POST with a type (which enrichment) and input_data (what you know), authenticated with a Bearer token.

Before looping over thousands, wrap a single call in a helper that branches on status and returns None on a miss. A clean per-row chain takes a name + company, resolves the LinkedIn URL, then appends email, phone, and firmographics:

import os, requests

API_KEY = os.environ["LINKFINDER_API_KEY"]
BASE_URL = "https://api.linkfinderai.com"

def call(enrichment_type, input_data, **extra):
    """One request = one credit. Returns result or None."""
    resp = requests.post(
        BASE_URL,
        headers={"Authorization": f"Bearer {API_KEY}",
                 "Content-Type": "application/json"},
        json={"type": enrichment_type, "input_data": input_data, **extra},
        timeout=30,
    )
    resp.raise_for_status()
    data = resp.json()
    return data.get("result") if data.get("status") == "success" else None

def enrich_lead(full_name, company):
    lead = {"full_name": full_name, "company": company}
    url = call("lead_full_name_to_linkedin_url", f"{full_name} {company}")
    if not url:
        return lead                         # nothing to chain off of
    lead["linkedin"] = url
    lead["email"]   = call("linkedin_profile_to_email", url)
    lead["phone"]   = call("linkedin_profile_to_phone", url)
    lead["website"] = call("company_name_to_website", company)
    return lead

print(enrich_lead("Sarah Mitchell", "CloudCore"))
const API_KEY = process.env.LINKFINDER_API_KEY;
const BASE_URL = "https://api.linkfinderai.com";

async function call(type, input_data, extra = {}) {
  const resp = await fetch(BASE_URL, {
    method: "POST",
    headers: { "Authorization": `Bearer ${API_KEY}`,
               "Content-Type": "application/json" },
    body: JSON.stringify({ type, input_data, ...extra }),
  });
  const data = await resp.json();
  return data.status === "success" ? data.result : null;
}

async function enrichLead(fullName, company) {
  const lead = { fullName, company };
  const url = await call("lead_full_name_to_linkedin_url", `${fullName} ${company}`);
  if (!url) return lead;
  lead.linkedin = url;
  lead.email   = await call("linkedin_profile_to_email", url);
  lead.phone   = await call("linkedin_profile_to_phone", url);
  lead.website = await call("company_name_to_website", company);
  return lead;
}

console.log(await enrichLead("Sarah Mitchell", "CloudCore"));
That chain uses up to four credits per lead. Trim it to only the fields you actually need — if you just want emails, skip the phone and website calls and cut your per-lead cost in half.
3

Run the bulk enrichment loop

The single-lead function scales straight to a list. Read the cleaned CSV, enrich each row, and stream results to an output file as you go (don't hold 50,000 records in memory). The one rule: stay under your plan's requests-per-second limit with a delay between calls.

import csv, time

INPUT, OUTPUT = "leads.csv", "leads_enriched.csv"
REQS_PER_SEC = 5            # match your plan (Starter = 5/s)
DELAY = 1.0 / REQS_PER_SEC

with open(INPUT, newline="") as f_in, open(OUTPUT, "w", newline="") as f_out:
    reader = csv.DictReader(f_in)
    fields = ["full_name", "company", "linkedin", "email", "phone", "website"]
    writer = csv.DictWriter(f_out, fieldnames=fields, extrasaction="ignore")
    writer.writeheader()

    for i, row in enumerate(reader, 1):
        lead = enrich_lead(row["full_name"], row["company"])
        writer.writerow(lead)
        f_out.flush()                       # persist as you go
        print(f"[{i}] {lead['full_name']} -> {lead.get('email')}")
        time.sleep(DELAY)                   # stay under the rate limit

print("Done.")

This is correct and safe, but serial. At one chain per second you'll process a few thousand leads an hour — fine for a nightly job, slow for a big backlog. Step 4 speeds it up without tripping the rate limit.

4

Add safe concurrency under your rate limit

The fastest correct bulk pipeline runs several leads in parallel while a shared limiter keeps total throughput under your plan's ceiling. The pattern: a worker pool bounded by your requests-per-second, with each worker processing one lead's chain at a time.

import csv, time, threading
from concurrent.futures import ThreadPoolExecutor

REQS_PER_SEC = 5            # your plan's ceiling
_lock = threading.Lock()
_last = [0.0]

def rate_limited():
    """Simple shared throttle across worker threads."""
    with _lock:
        wait = (1.0 / REQS_PER_SEC) - (time.time() - _last[0])
        if wait > 0:
            time.sleep(wait)
        _last[0] = time.time()

def worker(row):
    rate_limited()
    return enrich_lead(row["full_name"], row["company"])

with open("leads.csv", newline="") as f:
    rows = list(csv.DictReader(f))

with open("leads_enriched.csv", "w", newline="") as f_out, \
     ThreadPoolExecutor(max_workers=REQS_PER_SEC) as pool:
    fields = ["full_name", "company", "linkedin", "email", "phone", "website"]
    writer = csv.DictWriter(f_out, fieldnames=fields, extrasaction="ignore")
    writer.writeheader()
    for lead in pool.map(worker, rows):
        writer.writerow(lead); f_out.flush()

print("Done (concurrent).")
// Bounded concurrency: N workers pulling from one shared queue.
const CONCURRENCY = 5;      // match your plan's req/s ceiling

async function runBulk(rows, writeRow) {
  let i = 0;
  async function worker() {
    while (i < rows.length) {
      const row = rows[i++];
      const lead = await enrichLead(row.full_name, row.company);
      writeRow(lead);
      await new Promise(r => setTimeout(r, 1000 / CONCURRENCY)); // throttle
    }
  }
  await Promise.all(Array.from({ length: CONCURRENCY }, worker));
}
Concurrency multiplies how fast you hit the rate limit, not how high it is. Cap your worker count at your plan's req/s and the request chain inside each lead still counts toward it — a 4-call chain at 5 req/s is roughly one new lead per second, not five.
PlanCredits / moRequests / secBatch size
Starter5,0005 req/sUp to 500
Professional20,00010 req/sUp to 500
Enterprise50,00020 req/sUp to 500
HyperGrowth250,00050 req/sUp to 500
5

Make the job resumable and retry failures

A bulk run is a long-running job, and long jobs crash — a network blip, a timeout, an out-of-credits error at row 38,000. Without resumability you either re-spend thousands of credits or lose work. Two safeguards fix this: a checkpoint of completed keys, and retry-with-backoff on transient failures.

import os, csv, time, requests

DONE_FILE = "done_keys.txt"

def load_done():
    if not os.path.exists(DONE_FILE):
        return set()
    with open(DONE_FILE) as f:
        return set(line.strip() for line in f)

def mark_done(key):
    with open(DONE_FILE, "a") as f:
        f.write(key + "\n")

def call(enrichment_type, input_data, max_retries=4, **extra):
    delay = 1.0
    for _ in range(max_retries):
        resp = requests.post(
            BASE_URL,
            headers={"Authorization": f"Bearer {API_KEY}",
                     "Content-Type": "application/json"},
            json={"type": enrichment_type, "input_data": input_data, **extra},
            timeout=30,
        )
        if resp.status_code in (429, 500):
            time.sleep(delay); delay *= 2      # 1s -> 2s -> 4s -> 8s
            continue
        if resp.status_code == 402:
            raise RuntimeError("Out of credits — top up to resume.")
        resp.raise_for_status()
        data = resp.json()
        return data.get("result") if data.get("status") == "success" else None
    raise RuntimeError("Exhausted retries after repeated rate limiting.")

done = load_done()
with open("leads.csv", newline="") as f:
    for row in csv.DictReader(f):
        key = f"{row['full_name']}|{row['company']}".lower()
        if key in done:
            continue                          # already enriched -> skip
        lead = enrich_lead(row["full_name"], row["company"])
        # ...write lead to output...
        mark_done(key)
The 402 Insufficient credits error is the one to handle loudly — catch it, stop cleanly, top up, and re-run. Because completed keys are checkpointed, the resume picks up exactly where it left off without re-spending.
6

Control credit spend

The cost of a bulk run is predictable: rows × calls-per-row × 1 credit, and empty results count. That makes spend a design decision, not a surprise on the invoice. The levers, from biggest to smallest:

LeverEffectHow
De-dupe inputBigRemove duplicate & dead rows before any call (Step 1)
Trim the chainBigOnly request fields you'll use — drop phone/website if unneeded
Skip enriched rowsMediumDon't re-run records your CRM already has
Cache resultsMediumStore responses so a re-run never re-pays for a hit
Short-circuit chainsMediumIf the LinkedIn URL is null, stop — don't pay for downstream calls

A quick cost estimate before you launch keeps the bill honest:

rows = 12_000          # unique leads after de-duping
calls_per_lead = 4     # url + email + phone + website
worst_case = rows * calls_per_lead
print(f"Upper bound: {worst_case:,} credits")
# Real spend is lower: chains that miss the LinkedIn URL
# short-circuit and never make the 3 downstream calls.
A lookup that returns null still costs 1 credit. The cheapest credit is the one you never spend — clean the list, trim the chain, and short-circuit dead leads before you scale up.
7

Sync enriched leads to your CRM

Once the enriched CSV is written, get it where your team works. Two paths:

Option A — Direct from code

Batch-upsert the enriched rows into your CRM's API after the run. Map email, phone, and linkedin onto your contact fields and use the LinkedIn URL (or email) as the dedupe key so re-runs update rather than duplicate.

Option B — No-code via Zapier or Make

For lists that land continuously (a webform, a new-row trigger), call the endpoint from a generic HTTP step. One connection then fans out to HubSpot, Salesforce, Pipedrive, Airtable, Google Sheets, and thousands of other apps.

# Zapier: "Webhooks by Zapier" -> POST action
URL:     https://api.linkfinderai.com
Method:  POST
Headers: Authorization: Bearer YOUR_API_KEY
         Content-Type: application/json
Body:    {"type": "lead_full_name_to_linkedin_url",
          "input_data": "{{full_name}} {{company}}"}
# Make: HTTP -> "Make a request" module
URL:     https://api.linkfinderai.com
Method:  POST
Headers: Authorization: Bearer YOUR_API_KEY
Body:    JSON -> {"type": "linkedin_profile_to_email",
                 "input_data": "{{linkedin_url}}"}
Re-enrich on a schedule. People change jobs and companies grow, so a list that was complete six months ago is already drifting. A monthly job that re-runs only your highest-value accounts keeps the CRM current without re-spending on everyone.

Enrich your whole list in one run

Spin up the bulk pipeline above on the free tier — 100 credits, an API on every plan, and flat 1-credit-per-request pricing with no annual contract.

Get your API key

No credit card required • API on every plan • Flat pricing • Cancel anytime