Let's NiP05 scan the #spambots

Better coders can finetune I suppose

Assumptions and Setup

Nostr Relay: You have a WebSocket connection to a Nostr relay (e.g., wss://relay.damus.io) to fetch events.

NIP-05 Provider Data: You have a database or API with member public keys and their NIP-05 registration timestamps.

Libraries: Uses websocket for Nostr relay communication, json for event parsing, and random for selecting replies. You may need additional libraries like requests for NIP-05 verification checks.

Database: A simple SQLite database is assumed for storing member data, but you can adapt to your setup (e.g., MySQL, API).

Bot Detection: Basic bot detection checks for identical or nonsensical replies using simple text comparison. You can enhance this with NLP libraries like nltk or textblob for advanced analysis.

Script

import websocket

import json

import time

import random

import sqlite3

from datetime import datetime, timedelta

import requests

import re

from collections import Counter

# Configuration

RELAY_URL = "wss://relay.damus.io" # Replace with your relay

DB_PATH = "nip05_members.db" # SQLite database for NIP-05 members

TIME_WINDOW = 7 # Days to check for recent activity

REPLY_SAMPLE_SIZE = 5 # Number of replies to analyze

SIMILARITY_THRESHOLD = 0.9 # Jaccard similarity threshold for identical replies

# Initialize database

def init_db():

conn = sqlite3.connect(DB_PATH)

cursor = conn.cursor()

cursor.execute("""

CREATE TABLE IF NOT EXISTS members (

pubkey TEXT PRIMARY KEY,

nip05_address TEXT,

registration_timestamp INTEGER

)

""")

# Example: Populate with dummy data (replace with your actual data)

cursor.execute("INSERT OR IGNORE INTO members VALUES (?, ?, ?)",

("b0635d6a9851d3aed0cd6c495b282167acf761729078d975fc341b22650b07b9",

"bob@example.com", int(time.time() - 86400 * 30)))

conn.commit()

return conn, cursor

# Connect to Nostr relay

def connect_to_relay():

ws = websocket.WebSocket()

ws.connect(RELAY_URL)

return ws

# Fetch events from relay

def fetch_events(ws, pubkey, since_timestamp, kind=1):

subscription_id = f"scan_{pubkey[:8]}"

filter = {

"authors": [pubkey],

"kinds": [kind], # kind=1 for posts/replies

"since": since_timestamp

}

ws.send(json.dumps(["REQ", subscription_id, filter]))

events = []

timeout = time.time() + 10 # 10-second timeout

while time.time() < timeout:

try:

message = json.loads(ws.recv())

if message[0] == "EVENT" and message[1] == subscription_id:

events.append(message[2])

except websocket.WebSocketTimeoutException:

break

ws.send(json.dumps(["CLOSE", subscription_id]))

return events

# Calculate Jaccard similarity between two texts

def jaccard_similarity(text1, text2):

if not text1 or not text2:

return 0.0

set1 = set(re.findall(r'\w+', text1.lower()))

set2 = set(re.findall(r'\w+', text2.lower()))

intersection = len(set1 & set2)

union = len(set1 | set2)

return intersection / union if union > 0 else 0.0

# Check if text appears nonsensical (basic heuristic)

def is_nonsense(text):

# Basic check: too short, repetitive characters, or random strings

if len(text) < 10:

return True

if re.match(r'^(.)\1{3,}$', text): # Repeated characters

return True

# Add more checks (e.g., entropy, common spam phrases)

return False

# Check NIP-05 registration status

def check_nip05_status(pubkey, nip05_address):

try:

local_part, domain = nip05_address.split('@')

url = f"https://{domain}/.well-known/nostr.json?name={local_part}"

response = requests.get(url, timeout=5)

if response.status_code == 200:

data = response.json()

if data.get("names", {}).get(local_part) == pubkey:

return True

return False

except Exception as e:

print(f"Error checking NIP-05 for {nip05_address}: {e}")

return False

# Main scanning function

def scan_for_spam_bots():

conn, cursor = init_db()

ws = connect_to_relay()

# Get all members

cursor.execute("SELECT pubkey, nip05_address, registration_timestamp FROM members")

members = cursor.fetchall()

# Calculate timestamp for 7 days ago

since_timestamp = int((datetime.now() - timedelta(days=TIME_WINDOW)).timestamp())

log = []

for pubkey, nip05_address, reg_timestamp in members:

print(f"Scanning {nip05_address} ({pubkey[:8]}...)")

# Step 1: Check for at least one reply in the past 7 days

events = fetch_events(ws, pubkey, since_timestamp, kind=1)

replies = [e for e in events if "e" in e.get("tags", [])] # Events with 'e' tag are replies

if len(replies) < 1:

log.append({

"pubkey": pubkey,

"nip05_address": nip05_address,

"status": "No replies in past 7 days",

"nip05_active": check_nip05_status(pubkey, nip05_address),

"registration_date": datetime.fromtimestamp(reg_timestamp).isoformat()

})

continue

# Step 2: Analyze up to 5 random replies

sample_replies = random.sample(replies, min(len(replies), REPLY_SAMPLE_SIZE))

suspicious = False

reply_texts = [r["content"] for r in sample_replies]

# Check for identical or similar replies

for i, text1 in enumerate(reply_texts):

for text2 in reply_texts[i+1:]:

if jaccard_similarity(text1, text2) > SIMILARITY_THRESHOLD:

suspicious = True

break

if suspicious:

break

# Check for nonsensical replies

if not suspicious:

suspicious = any(is_nonsense(text) for text in reply_texts)

# Step 3: Check NIP-05 status (for logging only)

nip05_active = check_nip05_status(pubkey, nip05_address)

# Log results

log.append({

"pubkey": pubkey,

"nip05_address": nip05_address,

"status": "Suspicious" if suspicious else "Clean",

"reply_count": len(replies),

"nip05_active": nip05_active,

"registration_date": datetime.fromtimestamp(reg_timestamp).isoformat()

})

ws.close()

conn.close()

# Save log to file

with open("spam_scan_log.json", "w") as f:

json.dump(log, f, indent=2)

return log

# Run the script

if __name__ == "__main__":

results = scan_for_spam_bots()

print("Scan complete. Results saved to spam_scan_log.json")

for result in results:

print(result)

Im rather sure this won't work bc of multiple reasons:

All relays you connect to must implement this detection (or/and filtering)

NIP5 -as far as my understanding goes- isn't failsafe or intended for this puprose (they are identifiers, not verifiers. And appear spoofable?)

The suggested bot detection in the script is not appropriate for detection of the behavior as they mangle the OP, needing to both scan the OP and all replies to be somewhat succesful. Meaning determining the spam behavior isn't so trivial and can easily be circumvented by spammer once they notice it stops working and will change the mangling algo.

Maybe determining based on post create date + reply create date is more efficient, as they appear to reply almost instantly once a post launches. But again, this is easily circumvented.

Unlike traditional anti spam measures - which are specific to centralised systems mostly - you can't block a user from creating an account. Nor have a centralized store of blocked IPs or other metadata. Or detecting bots the cloudflare proof of work way (browser checks) also isn't possible.

So. That's where my understanding stops. Nor do I have ideas.

#nostr #asknostr might check my claims for correctness tho

Reply to this note

Please Login to reply.

Discussion

makes sense...

Thinking here... there must be something that can be done in a smart way