Enriching one lead is easy. Enriching fifty thousand is a different problem: now you care about throughput, rate limits, retries, resumability, and — above all — not setting fire to your credit balance on rows you didn't need. Bulk enrichment is less about the lookup and more about the loop around it.
This tutorial builds a production-grade bulk pipeline against the LinkFinder AI API end to end: cleaning and de-duplicating your input, enriching a CSV row by row, layering in safe concurrency that respects your plan's rate limit, making the whole job resumable so a crash doesn't cost you, controlling spend, and pushing the finished records into your CRM. Everything runs against a single endpoint — POST https://api.linkfinderai.com — and one request costs one credit, including lookups that come back empty, so cost stays easy to forecast.
Examples are in Python and Node.js. The free tier's 100 credits is enough to test the full flow on a small list before you scale it up.
Prepare and de-duplicate your list
This is the single highest-leverage step, and most people skip it. Because every lookup costs a credit — even one that returns nothing — a messy input list is money straight down the drain. Clean before you spend.
- De-duplicate on a stable key (LinkedIn URL if you have it, otherwise normalized name + company). Duplicates are pure waste.
- Normalize company names and trim whitespace so
"CloudCore "and"cloudcore"collapse to one row. - Drop rows you can't act on — no name and no identifier means no match, just a wasted credit.
- Skip what you already have. Don't re-enrich rows your CRM already filled in last month.
import csv
def normalize(s):
return (s or "").strip().lower()
seen, clean_rows = set(), []
with open("leads_raw.csv", newline="") as f:
for row in csv.DictReader(f):
name, company = row.get("full_name", ""), row.get("company", "")
if not name.strip():
continue # nothing to match on
key = (normalize(name), normalize(company))
if key in seen:
continue # duplicate -> skip
seen.add(key)
clean_rows.append({"full_name": name.strip(), "company": company.strip()})
with open("leads.csv", "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=["full_name", "company"])
w.writeheader(); w.writerows(clean_rows)
print(f"Kept {len(clean_rows)} unique leads.")Set up the API and a single enrichment
Grab your API key from Settings → API Key and store it as an environment variable — never hardcode it. Every request is a POST with a type (which enrichment) and input_data (what you know), authenticated with a Bearer token.
Before looping over thousands, wrap a single call in a helper that branches on status and returns None on a miss. A clean per-row chain takes a name + company, resolves the LinkedIn URL, then appends email, phone, and firmographics:
import os, requests
API_KEY = os.environ["LINKFINDER_API_KEY"]
BASE_URL = "https://api.linkfinderai.com"
def call(enrichment_type, input_data, **extra):
"""One request = one credit. Returns result or None."""
resp = requests.post(
BASE_URL,
headers={"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"},
json={"type": enrichment_type, "input_data": input_data, **extra},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
return data.get("result") if data.get("status") == "success" else None
def enrich_lead(full_name, company):
lead = {"full_name": full_name, "company": company}
url = call("lead_full_name_to_linkedin_url", f"{full_name} {company}")
if not url:
return lead # nothing to chain off of
lead["linkedin"] = url
lead["email"] = call("linkedin_profile_to_email", url)
lead["phone"] = call("linkedin_profile_to_phone", url)
lead["website"] = call("company_name_to_website", company)
return lead
print(enrich_lead("Sarah Mitchell", "CloudCore"))const API_KEY = process.env.LINKFINDER_API_KEY;
const BASE_URL = "https://api.linkfinderai.com";
async function call(type, input_data, extra = {}) {
const resp = await fetch(BASE_URL, {
method: "POST",
headers: { "Authorization": `Bearer ${API_KEY}`,
"Content-Type": "application/json" },
body: JSON.stringify({ type, input_data, ...extra }),
});
const data = await resp.json();
return data.status === "success" ? data.result : null;
}
async function enrichLead(fullName, company) {
const lead = { fullName, company };
const url = await call("lead_full_name_to_linkedin_url", `${fullName} ${company}`);
if (!url) return lead;
lead.linkedin = url;
lead.email = await call("linkedin_profile_to_email", url);
lead.phone = await call("linkedin_profile_to_phone", url);
lead.website = await call("company_name_to_website", company);
return lead;
}
console.log(await enrichLead("Sarah Mitchell", "CloudCore"));Run the bulk enrichment loop
The single-lead function scales straight to a list. Read the cleaned CSV, enrich each row, and stream results to an output file as you go (don't hold 50,000 records in memory). The one rule: stay under your plan's requests-per-second limit with a delay between calls.
import csv, time
INPUT, OUTPUT = "leads.csv", "leads_enriched.csv"
REQS_PER_SEC = 5 # match your plan (Starter = 5/s)
DELAY = 1.0 / REQS_PER_SEC
with open(INPUT, newline="") as f_in, open(OUTPUT, "w", newline="") as f_out:
reader = csv.DictReader(f_in)
fields = ["full_name", "company", "linkedin", "email", "phone", "website"]
writer = csv.DictWriter(f_out, fieldnames=fields, extrasaction="ignore")
writer.writeheader()
for i, row in enumerate(reader, 1):
lead = enrich_lead(row["full_name"], row["company"])
writer.writerow(lead)
f_out.flush() # persist as you go
print(f"[{i}] {lead['full_name']} -> {lead.get('email')}")
time.sleep(DELAY) # stay under the rate limit
print("Done.")This is correct and safe, but serial. At one chain per second you'll process a few thousand leads an hour — fine for a nightly job, slow for a big backlog. Step 4 speeds it up without tripping the rate limit.
Add safe concurrency under your rate limit
The fastest correct bulk pipeline runs several leads in parallel while a shared limiter keeps total throughput under your plan's ceiling. The pattern: a worker pool bounded by your requests-per-second, with each worker processing one lead's chain at a time.
import csv, time, threading
from concurrent.futures import ThreadPoolExecutor
REQS_PER_SEC = 5 # your plan's ceiling
_lock = threading.Lock()
_last = [0.0]
def rate_limited():
"""Simple shared throttle across worker threads."""
with _lock:
wait = (1.0 / REQS_PER_SEC) - (time.time() - _last[0])
if wait > 0:
time.sleep(wait)
_last[0] = time.time()
def worker(row):
rate_limited()
return enrich_lead(row["full_name"], row["company"])
with open("leads.csv", newline="") as f:
rows = list(csv.DictReader(f))
with open("leads_enriched.csv", "w", newline="") as f_out, \
ThreadPoolExecutor(max_workers=REQS_PER_SEC) as pool:
fields = ["full_name", "company", "linkedin", "email", "phone", "website"]
writer = csv.DictWriter(f_out, fieldnames=fields, extrasaction="ignore")
writer.writeheader()
for lead in pool.map(worker, rows):
writer.writerow(lead); f_out.flush()
print("Done (concurrent).")// Bounded concurrency: N workers pulling from one shared queue.
const CONCURRENCY = 5; // match your plan's req/s ceiling
async function runBulk(rows, writeRow) {
let i = 0;
async function worker() {
while (i < rows.length) {
const row = rows[i++];
const lead = await enrichLead(row.full_name, row.company);
writeRow(lead);
await new Promise(r => setTimeout(r, 1000 / CONCURRENCY)); // throttle
}
}
await Promise.all(Array.from({ length: CONCURRENCY }, worker));
}| Plan | Credits / mo | Requests / sec | Batch size |
|---|---|---|---|
| Starter | 5,000 | 5 req/s | Up to 500 |
| Professional | 20,000 | 10 req/s | Up to 500 |
| Enterprise | 50,000 | 20 req/s | Up to 500 |
| HyperGrowth | 250,000 | 50 req/s | Up to 500 |
Make the job resumable and retry failures
A bulk run is a long-running job, and long jobs crash — a network blip, a timeout, an out-of-credits error at row 38,000. Without resumability you either re-spend thousands of credits or lose work. Two safeguards fix this: a checkpoint of completed keys, and retry-with-backoff on transient failures.
import os, csv, time, requests
DONE_FILE = "done_keys.txt"
def load_done():
if not os.path.exists(DONE_FILE):
return set()
with open(DONE_FILE) as f:
return set(line.strip() for line in f)
def mark_done(key):
with open(DONE_FILE, "a") as f:
f.write(key + "\n")
def call(enrichment_type, input_data, max_retries=4, **extra):
delay = 1.0
for _ in range(max_retries):
resp = requests.post(
BASE_URL,
headers={"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"},
json={"type": enrichment_type, "input_data": input_data, **extra},
timeout=30,
)
if resp.status_code in (429, 500):
time.sleep(delay); delay *= 2 # 1s -> 2s -> 4s -> 8s
continue
if resp.status_code == 402:
raise RuntimeError("Out of credits — top up to resume.")
resp.raise_for_status()
data = resp.json()
return data.get("result") if data.get("status") == "success" else None
raise RuntimeError("Exhausted retries after repeated rate limiting.")
done = load_done()
with open("leads.csv", newline="") as f:
for row in csv.DictReader(f):
key = f"{row['full_name']}|{row['company']}".lower()
if key in done:
continue # already enriched -> skip
lead = enrich_lead(row["full_name"], row["company"])
# ...write lead to output...
mark_done(key)402 Insufficient credits error is the one to handle loudly — catch it, stop cleanly, top up, and re-run. Because completed keys are checkpointed, the resume picks up exactly where it left off without re-spending.Control credit spend
The cost of a bulk run is predictable: rows × calls-per-row × 1 credit, and empty results count. That makes spend a design decision, not a surprise on the invoice. The levers, from biggest to smallest:
| Lever | Effect | How |
|---|---|---|
| De-dupe input | Big | Remove duplicate & dead rows before any call (Step 1) |
| Trim the chain | Big | Only request fields you'll use — drop phone/website if unneeded |
| Skip enriched rows | Medium | Don't re-run records your CRM already has |
| Cache results | Medium | Store responses so a re-run never re-pays for a hit |
| Short-circuit chains | Medium | If the LinkedIn URL is null, stop — don't pay for downstream calls |
A quick cost estimate before you launch keeps the bill honest:
rows = 12_000 # unique leads after de-duping
calls_per_lead = 4 # url + email + phone + website
worst_case = rows * calls_per_lead
print(f"Upper bound: {worst_case:,} credits")
# Real spend is lower: chains that miss the LinkedIn URL
# short-circuit and never make the 3 downstream calls.null still costs 1 credit. The cheapest credit is the one you never spend — clean the list, trim the chain, and short-circuit dead leads before you scale up.Sync enriched leads to your CRM
Once the enriched CSV is written, get it where your team works. Two paths:
Option A — Direct from code
Batch-upsert the enriched rows into your CRM's API after the run. Map email, phone, and linkedin onto your contact fields and use the LinkedIn URL (or email) as the dedupe key so re-runs update rather than duplicate.
Option B — No-code via Zapier or Make
For lists that land continuously (a webform, a new-row trigger), call the endpoint from a generic HTTP step. One connection then fans out to HubSpot, Salesforce, Pipedrive, Airtable, Google Sheets, and thousands of other apps.
# Zapier: "Webhooks by Zapier" -> POST action
URL: https://api.linkfinderai.com
Method: POST
Headers: Authorization: Bearer YOUR_API_KEY
Content-Type: application/json
Body: {"type": "lead_full_name_to_linkedin_url",
"input_data": "{{full_name}} {{company}}"}# Make: HTTP -> "Make a request" module
URL: https://api.linkfinderai.com
Method: POST
Headers: Authorization: Bearer YOUR_API_KEY
Body: JSON -> {"type": "linkedin_profile_to_email",
"input_data": "{{linkedin_url}}"}Enrich your whole list in one run
Spin up the bulk pipeline above on the free tier — 100 credits, an API on every plan, and flat 1-credit-per-request pricing with no annual contract.
Get your API keyNo credit card required • API on every plan • Flat pricing • Cancel anytime