Consuming Data¶
This guide covers how to consume and process signal-spec data for analysis, storage, and reporting.
Overview¶
Signal-spec data flows through multiple consumption patterns:
flowchart TD
SIG[Signals] --> PROC[Processing]
RC[Root Causes] --> PROC
REM[Remediations] --> PROC
PROC --> STORE[Storage]
PROC --> ANALYZE[Analysis]
PROC --> REPORT[Reporting]
STORE --> DB[(Database)]
ANALYZE --> LLM[LLM Analysis]
REPORT --> XLSX[XLSX Reports]
Loading Data¶
From JSON Files¶
import (
"encoding/json"
"os"
"github.com/plexusone/signal-spec/pkg/rootcause"
)
func LoadRootCauses(filename string) ([]rootcause.RootCause, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
var rcs []rootcause.RootCause
if err := json.Unmarshal(data, &rcs); err != nil {
// Try single object
var rc rootcause.RootCause
if err := json.Unmarshal(data, &rc); err != nil {
return nil, err
}
return []rootcause.RootCause{rc}, nil
}
return rcs, nil
}
Using the Export Package¶
import "github.com/plexusone/signal-spec/pkg/export"
// Load from file
rcs, err := export.LoadRootCausesFromFile("rootcauses.json")
// Load from directory
rcs, err := export.LoadRootCausesFromDir("./rootcauses/")
Storage Patterns¶
Relational Database¶
Map signal-spec types to relational tables:
CREATE TABLE signals (
id VARCHAR(255) PRIMARY KEY,
type VARCHAR(50) NOT NULL,
status VARCHAR(50) NOT NULL,
source_type VARCHAR(100),
source_name VARCHAR(100),
source_external_id VARCHAR(255),
domain_name VARCHAR(100) NOT NULL,
domain_subdomain VARCHAR(100),
severity VARCHAR(20) NOT NULL,
summary TEXT NOT NULL,
description TEXT,
observed_at TIMESTAMP NOT NULL,
received_at TIMESTAMP NOT NULL,
root_cause_id VARCHAR(255),
fingerprint VARCHAR(255),
metadata JSONB,
tags TEXT[],
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE root_causes (
id VARCHAR(255) PRIMARY KEY,
title TEXT NOT NULL,
description TEXT,
status VARCHAR(50) NOT NULL,
domain_name VARCHAR(100) NOT NULL,
domain_subdomain VARCHAR(100),
severity VARCHAR(20) NOT NULL,
signal_count INTEGER DEFAULT 0,
priority_score INTEGER,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
owner_team VARCHAR(100),
remediation_id VARCHAR(255),
recurrence_count INTEGER DEFAULT 0,
metadata JSONB,
tags TEXT[],
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_signals_domain ON signals(domain_name, domain_subdomain);
CREATE INDEX idx_signals_root_cause ON signals(root_cause_id);
CREATE INDEX idx_root_causes_status ON root_causes(status);
CREATE INDEX idx_root_causes_severity ON root_causes(severity);
Document Database¶
Store as JSON documents with appropriate indexes:
// MongoDB example
db.signals.createIndex({ "domain.name": 1, "domain.subdomain": 1 });
db.signals.createIndex({ "root_cause_id": 1 });
db.signals.createIndex({ "observed_at": -1 });
db.rootCauses.createIndex({ "status": 1 });
db.rootCauses.createIndex({ "domain.name": 1 });
db.rootCauses.createIndex({ "priority_score": -1 });
Querying Patterns¶
Find Signals by Domain¶
func SignalsByDomain(domain, subdomain string) ([]signal.Signal, error) {
query := `
SELECT * FROM signals
WHERE domain_name = $1
AND ($2 = '' OR domain_subdomain = $2)
ORDER BY observed_at DESC
`
// Execute query...
}
Aggregate by Domain¶
func AggregateByDomain(rcs []rootcause.RootCause) map[string]int {
counts := make(map[string]int)
for _, rc := range rcs {
key := rc.Domain.Name + "|" + rc.Domain.Subdomain
counts[key]++
}
return counts
}
Filter by Status¶
func ActiveRootCauses(rcs []rootcause.RootCause) []rootcause.RootCause {
var active []rootcause.RootCause
for _, rc := range rcs {
switch rc.Status {
case rootcause.StatusNew,
rootcause.StatusActive,
rootcause.StatusMitigating:
active = append(active, rc)
}
}
return active
}
Generating Reports¶
XLSX Summary Report¶
import "github.com/plexusone/signal-spec/pkg/export"
// Load root causes
rcs, err := export.LoadRootCausesFromFile("rootcauses.json")
if err != nil {
log.Fatal(err)
}
// Build summary report
report := export.BuildSummaryReport(rcs)
// Optionally apply leader mappings
leaders, _ := export.LoadLeaderMappings("leaders.json")
if leaders != nil {
leaders.Apply(report)
}
// Write XLSX
if err := report.WriteXLSX("summary.xlsx"); err != nil {
log.Fatal(err)
}
Using CLI¶
# Basic report
signal-spec report -i rootcauses.json -o summary.xlsx
# With leader mappings
signal-spec report -i rootcauses.json --leaders leaders.json -o summary.xlsx
# From directory of files
signal-spec report -d ./rootcauses/ -o summary.xlsx
Analytics Queries¶
Top Issues by Signal Count¶
SELECT
domain_name,
domain_subdomain,
title,
signal_count,
priority_score
FROM root_causes
WHERE status IN ('new', 'active', 'mitigating')
ORDER BY signal_count DESC
LIMIT 10;
Trend Analysis¶
SELECT
DATE_TRUNC('day', observed_at) as day,
domain_name,
COUNT(*) as signal_count
FROM signals
WHERE observed_at > NOW() - INTERVAL '30 days'
GROUP BY day, domain_name
ORDER BY day, domain_name;
Remediation Effectiveness¶
SELECT
r.id,
r.title,
r.status,
COUNT(DISTINCT s.id) as pre_deploy_signals,
AVG(vs.reduction_percent) as avg_reduction
FROM remediations r
LEFT JOIN signals s ON s.root_cause_id = ANY(r.root_cause_ids)
AND s.observed_at < r.deployed_at
LEFT JOIN validation_signals vs ON vs.remediation_id = r.id
WHERE r.deployed_at IS NOT NULL
GROUP BY r.id, r.title, r.status;
Event Processing¶
Stream Processing¶
func ProcessSignalStream(signals <-chan *signal.Signal) {
for sig := range signals {
// Validate
if err := validateSignal(sig); err != nil {
log.Printf("invalid signal: %v", err)
continue
}
// Store
if err := storeSignal(sig); err != nil {
log.Printf("storage error: %v", err)
continue
}
// Trigger analysis
triggerRootCauseAnalysis(sig)
}
}
Webhook Handler¶
func HandleSignalWebhook(w http.ResponseWriter, r *http.Request) {
var sig signal.Signal
if err := json.NewDecoder(r.Body).Decode(&sig); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
sig.ReceivedAt = time.Now()
sig.Status = signal.StatusNew
if err := processSignal(&sig); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
}
Best Practices¶
Index Appropriately
Create indexes on commonly queried fields: domain, status, severity, timestamps.
Partition by Time
For high-volume signal tables, partition by observed_at for efficient querying and archival.
Handle Missing Fields
Optional fields may be null/empty. Always check before using.
Cache Aggregations
Domain summaries and counts can be expensive. Cache or pre-compute for dashboards.