summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--filter_rspamd.go238
1 files changed, 238 insertions, 0 deletions
diff --git a/filter_rspamd.go b/filter_rspamd.go
new file mode 100644
index 0000000..b0ecede
--- /dev/null
+++ b/filter_rspamd.go
@@ -0,0 +1,238 @@
+// Copyright (c) 2019 Sunil Nimmagadda <sunil@nimmagadda.net>
+//
+// Permission to use, copy, modify, and distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+package main
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "log"
+ "net/http"
+ "net/mail"
+ "os"
+ "strings"
+)
+
+const rspamdURL = "http://localhost:11333/checkv2"
+
+var stdout *log.Logger
+
+type session struct {
+ ch <-chan string
+ control map[string]string
+ id string
+ payload *strings.Builder
+}
+
+type rspamdResponse struct {
+ Score float32
+ RequiredScore float32 `json:"required_score"`
+ Subject string
+ Action string
+ DKIMSig string `json:"dkim-signature"`
+}
+
+func linkConnect(s *session, args []string) {
+ rdns, laddr := args[6], args[8]
+ s.control["Pass"] = "all"
+ p := strings.Split(laddr, ":")
+ if p[0] != "local" {
+ s.control["Ip"] = p[0]
+ }
+ if rdns != "" {
+ s.control["Hostname"] = rdns
+ }
+}
+
+func linkIdentify(s *session, args []string) {
+ s.control["Helo"] = args[6]
+}
+
+func txBegin(s *session, args []string) {
+ s.control["Queue-Id"] = args[6]
+}
+
+func txMail(s *session, args []string) {
+ mail_from, status := args[7], args[8]
+ if status == "ok" {
+ s.control["From"] = mail_from
+ }
+}
+
+func txRcpt(s *session, args []string) {
+ rcpt_to, status := args[7], args[8]
+ if status == "ok" {
+ s.control["Rcpt"] = rcpt_to
+ }
+}
+
+func txData(s *session, args []string) {
+ status := args[7]
+ if status == "ok" {
+ s.control = nil
+ }
+}
+
+func txCleanup(s *session, args []string) {
+ s.control = nil
+}
+
+func filterCommit(s *session, args []string) {
+ token := args[5]
+ reason := <-s.ch
+ if reason != "" {
+ stdout.Printf("filter-result|%s|%s|reject|%s\n",
+ token, s.id, reason)
+ return
+ }
+ stdout.Printf("filter-result|%s|%s|proceed\n", token, s.id)
+}
+
+func filterDataLine(s *session, args []string) {
+ token, line := args[5], args[7]
+ if line != "." {
+ s.payload.WriteString(line)
+ s.payload.WriteString("\n")
+ return
+ }
+ s.ch = dataOutput(s.control, token, s.id, s.payload.String())
+}
+
+func rspamdPost(hdrs map[string]string, data string) (*rspamdResponse, error) {
+ r := strings.NewReader(data)
+ client := &http.Client{}
+ req, err := http.NewRequest("POST", rspamdURL, r)
+ if err != nil {
+ return nil, err
+ }
+ for k, v := range hdrs {
+ req.Header.Add(k, v)
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+ rr := &rspamdResponse{}
+ if err := json.NewDecoder(resp.Body).Decode(rr); err != nil {
+ return nil, err
+ }
+ return rr, nil
+}
+
+func dataOutput(headers map[string]string,
+ token, id, data string) <-chan string {
+ ch := make(chan string)
+ go func() {
+ resp, err := rspamdPost(headers, data)
+ if err != nil {
+ log.Fatal(err)
+ }
+ log.Printf("%v\n", resp)
+ m, err := mail.ReadMessage(strings.NewReader(data))
+ if err != nil {
+ log.Fatal(err)
+ }
+ rejectReason := ""
+ switch resp.Action {
+ case "add header":
+ m.Header["X-Spam"] = []string{"yes"}
+ m.Header["X-Spam-Score"] = []string{
+ fmt.Sprintf("%v / %v",
+ resp.Score, resp.RequiredScore)}
+ case "rewrite subject":
+ m.Header["Subject"] = []string{resp.Subject}
+ case "reject":
+ rejectReason = "550 message rejected"
+ case "greylist":
+ rejectReason = "421 greylisted"
+ case "soft reject":
+ rejectReason = "451 try again later"
+ }
+ // Write DKIM-Signature header first if present
+ if resp.DKIMSig != "" {
+ stdout.Printf("filter-dataline|%s|%s|%s: %s\n",
+ token, id, "DKIM-Signature", resp.DKIMSig)
+ }
+ // preserve order?
+ for k, v := range m.Header {
+ stdout.Printf("filter-dataline|%s|%s|%s: %s\n",
+ token, id, k, strings.Join(v, ","))
+ }
+ // Blank line seperates headers and body
+ stdout.Printf("filter-dataline|%s|%s|\n", token, id)
+ s := bufio.NewScanner(m.Body)
+ for s.Scan() {
+ stdout.Printf("filter-dataline|%s|%s|%s\n",
+ token, id, s.Text())
+ }
+ stdout.Printf("filter-dataline|%s|%s|%s\n", token, id, ".")
+ ch <- rejectReason
+ }()
+ return ch
+}
+
+func main() {
+ log.SetFlags(0)
+ log.SetPrefix("filter_rspamd: ")
+ stdout = log.New(os.Stdout, "", 0)
+ registry := map[string]struct {
+ kind string
+ fn func(*session, []string)
+ }{
+ "link-connect": {"report", linkConnect},
+ "link-disconnect": {"report", nil},
+ "link-identify": {"report", linkIdentify},
+ "tx-begin": {"report", txBegin},
+ "tx-data": {"report", txData},
+ "tx-mail": {"report", txMail},
+ "tx-rcpt": {"report", txRcpt},
+ "tx-commit": {"report", txCleanup},
+ "tx-rollback": {"report", txCleanup},
+ "commit": {"filter", filterCommit},
+ "data-line": {"filter", filterDataLine},
+ }
+ for k, v := range registry {
+ fmt.Printf("register|%s|smtp-in|%s\n", v.kind, k)
+ }
+ fmt.Println("register|ready")
+ sessions := map[string]*session{}
+ var event, id string
+ stdin := bufio.NewScanner(os.Stdin)
+ for stdin.Scan() {
+ fields := strings.Split(stdin.Text(), "|")
+ switch fields[0] {
+ case "report":
+ id = fields[5]
+ case "filter":
+ id = fields[6]
+ default:
+ log.Fatalf("Unknown kind: %s", fields[0])
+ }
+ event = fields[4]
+ switch event {
+ case "link-disconnect":
+ delete(sessions, id)
+ case "link-connect":
+ sessions[id] = &session{
+ control: map[string]string{},
+ id: id,
+ payload: &strings.Builder{}}
+ fallthrough
+ default:
+ registry[event].fn(sessions[id], fields)
+ }
+ }
+}