N8N 自动化可编排流程,解放你的双手

在漫长的折腾生涯中,玩过太多东西,但是大浪淘沙,最终剩下来常用的只有那么几款。 在这个系列中,准备介绍一些非常有意思的开源产品。 n8n 是一个开源免费的自动化工作流平台,它提供了 200多个不同的节点来自动化工作流程,通过编排,可以实现跨不同服务的自动化流程。 相比市面上已有的 Zapier 等其他商业化自动化工具,n8n稍显稚嫩。但是免费开源以及强大的拓展性和自定义能力使得n8n在这个领域也是独树一帜。 部署 n8n 分为两个版本: 云服务版本: 官方提供的云服务版本,到官网上注册账号使用,需要付费 自托管版本: n8n项目本身有开源版本,可以自己编译源码部署,也可以通过官方提供的Docker镜像,使用 Docker 部署 推荐使用 Docker 进行部署,简单快捷省心 docker volume create n8n_data docker run -it --rm --name n8n -p 5678:5678 -v n8n_data:/home/node/.n8n docker.n8n.io/n8nio/n8n 在默认情况下,n8n会使用 SQLite 数据库进行数据存储,我们可以通过配置环境变量,把存储切换到 PostgresDB docker volume create n8n_data docker run -it --rm \ --name n8n \ -p 5678:5678 \ -e DB_TYPE=postgresdb \ -e DB_POSTGRESDB_DATABASE=<POSTGRES_DATABASE> \ -e DB_POSTGRESDB_HOST=<POSTGRES_HOST> \ -e DB_POSTGRESDB_PORT=<POSTGRES_PORT> \ -e DB_POSTGRESDB_USER=<POSTGRES_USER> \ -e DB_POSTGRESDB_SCHEMA=<POSTGRES_SCHEMA> \ -e DB_POSTGRESDB_PASSWORD=<POSTGRES_PASSWORD> \ -v n8n_data:/home/node/....

N8N 自动化可编排流程,解放你的双手

在数据采集中使用对象池的实践

在我的日常工作中,有很大精力投入到数据采集上。我需要从 syslog 采集大量数据,通常的流程是,将每条数据进行校验之后解析为对象进行一系列的处理与分析。这会产生大量对象,在 Java 中,大量对象必然意味着大量堆内存和频繁的 GC。为提高对象利用率,降低 GC 压力,我们基于对象池技术进行了一些优化手段。 一、为什么需要对象池 在数据采集系统中,每秒钟可能处理成千上万条日志记录,每条记录都需要转换为对象。频繁的对象创建和销毁会导致较高的性能开销,尤其是增加垃圾回收(GC)的频率,从而影响系统的整体性能。对象池通过复用对象减少创建和销毁的次数,提升性能和资源利用率。 二、对象池的原理 在 Java 中,说到池,我们通常会想到连接池、线程池。实际上,所有的池都是为了解决同一个问题:降低资源重复创建和销毁的频率。 对象池的工作机制与线程池和连接池相似。对象池通过维护一定数量的对象,当需要使用时从池中取出,使用完毕后再归还池中,避免了频繁的对象创建和销毁,显著减少了 GC 的负担。基本原理如下: 预创建对象:在初始化时,预先创建一组对象或线程,放入池中备用。 获取和归还:需要时从池中取出,使用完毕后归还池中。 复用机制:通过复用已有的对象或线程,避免频繁创建和销毁,提升系统性能。 三、自定义对象池的核心实现 以下是一个自定义对象池在数据采集场景中的实战示例代码: import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class ObjectPool<T> { private BlockingQueue<T> pool; private int maxPoolSize; private ObjectFactory<T> factory; public ObjectPool(int maxPoolSize, ObjectFactory<T> factory) { this.maxPoolSize = maxPoolSize; this.factory = factory; this.pool = new LinkedBlockingQueue<>(maxPoolSize); initializePool(); } private void initializePool() { for (int i = 0; i < maxPoolSize; i++) { pool....

在数据采集中使用对象池的实践

在业务中使用 Kafka 到底能不能保证消息的有序性

金三银四,最近开放简历做了一些面试。在一次面试中,就 Kafka 消息的有序性进行了一番讨论,这里贴一下相关思考。 首先贴结论: 在kafka中,多 partition 的情况下,kafka本身是无法保证消息的有序性的。但是可以通过逻辑控制保证消息的有序性。 为什么无序? 在Apache Kafka中,一个主题(Topic)可以被分为多个分区(Partitions),这种设计是为了实现水平扩展和提高吞吐量。每个分区都是一个有序的、不可变的消息序列,新的消息不断追加到序列的末尾。 然而,当一个主题包含多个分区时,Kafka的架构确实决定了它无法全局保证消息的有序性。主要原因如下: 生产者的分区策略:生产者可以根据消息的键(Key)或者自定义的分区策略来决定将消息发送到哪个分区。如果不同的消息使用了不同的键或者被发送到不同的分区,那么这些消息之间的顺序就无法得到保证 分区间的并行性:Kafka允许消费者并行地从多个分区中读取消息。由于不同分区的消息可以被不同的消费者实例同时处理,因此这些消息的到达和处理顺序在全局范围内是无法保证的 分区间的独立性:每个分区都是独立的,它们之间没有直接的顺序关联。生产者可以将消息发送到任意一个分区,而消费者也可以独立地从每个分区中消费消息。这种独立性意味着,即使在一个分区内部消息是有序的,但在不同分区之间的消息顺序是无法控制的 可以有序吗?如何保证? Kafka 每个分区都是一个有序的、不可变的消息序列,新的消息不断追加到序列的末尾。消费者按照消息在分区中的顺序来消费消息。因此,要保证消息的顺序处理,关键在于确保同一业务逻辑的消息发送到同一个分区。 可以通过以下方式来处理有序性需求: 单分区主题: 最简单的方法是为每个需要保证顺序的逻辑创建一个单独的Kafka主题,并设置该主题只有一个分区。这样,所有消息都会按照发送顺序被消费。但是,这种方法牺牲了Kafka的水平扩展能力。 使用相同的键: Kafka允许生产者为每条消息指定一个键(Key)。当消息被发送到Kafka时,Kafka会根据消息键的哈希值来决定将消息发送到哪个分区。因此,如果所有需要保证顺序的消息都使用相同的键,那么这些消息就会被发送到同一个分区,从而保证了顺序。 自定义分区策略: 如果默认的哈希分区策略不能满足需求,可以自定义分区策略。通过实现Partitioner接口,可以控制消息发送到哪个分区。例如,可以根据业务逻辑将属于同一顺序逻辑的消息发送到特定的分区。 消费者端顺序处理: 即使生产者保证了消息的顺序,消费者端也需要正确处理以维持顺序。消费者应该确保在处理完一条消息后,再拉取下一条消息,避免并发处理导致顺序混乱 注意事项 当使用多个消费者实例消费同一个分区时,无法保证消息的顺序处理 在保证顺序的同时,也要考虑系统的吞吐量和可用性,避免过度限制Kafka的性能

在业务中使用 Kafka 到底能不能保证消息的有序性

Java 程序优化之-如何更好的利用CPU

昨天,有人跟我聊起项目中对程序的优化,有一个特别有意思的话题《如何榨干一台机器的CPU》 现在的市面上,多核CPU是主流,有了多核的加持,可以更加有效的发挥硬件的能力,基于Java程序,我们究竟该如何更加有效的应用多核的能力?我个人经验来讲,主要考虑一下几个方面: 并行执行任务 减少共享数据的写操作 采用合适的方式处理竞争资源 减少数据拷贝次数 合适的GC 接下来详细说明。 1. 并行执行任务 合理利用多线程执行任务,可以有效的发挥CPU的多核性能。由于超线程技术的存在,实际上CPU可以并行执行的线程数量通常是物理核心数量的2倍。 我们都知道,在计算机中,进程是操作系统资源(内存、显卡、磁盘)分配的最小单位。线程是CPU执行调度的最小单位。 因此,实现并行计算的方式大体上有三种:多进程、多线程、多进程+多线程。具体采用哪种方式,就需要实际情况实际分析了。整体指导方针是:如果多线程可以解决,就不要尝试引入多进程。因为每个进程之间是独立的,多进程任务难免会涉及到进程之间通信,而进程之间的协调与通信通常会比较复杂。容易为程序引入额外的复杂度,得不偿失。 2. 减少共享数据的写操作 深入到线程中,每个线程都有自己的内存空间,在这个内存中,线程可以随意进行读写。因此多线程任务中,提高效率的优化手段之一就是: 尽量避免多个线程共同操作共享资源,如果条件允许,尽量采用以空间换时间的方式,将数据复制多份保存在每个线程单独的内存空间中。 如果必须存在共享内存的操作,我们的措施通常是,尽量减少共享数据的写操作,在共享内存中,多个线程的读操作是不存在资源的竞争的。一旦涉及到写共享内存,通常会使用 volatile 关键字保证内存数据对多个线程的可见性,这种情况下就不可避免的要涉及到插入内存屏障指令,用来保证处理器对指令的执行顺序不会打乱。相比不存在内存屏障的操作,性能会有所下降。 因此,需要尽量减少多个线程对共享内存的写操作。具体的方案是: 通过业务逻辑控制,在程序设计之初,排除掉共享数据的方案 在每个线程内部创建单独的对象,互不影响 使用 ThreadLocal 生成线程的本地对象副本 3. 采用合适的方式处理竞争资源 多线程任务中,涉及到资源竞争的部分,通常都需要采用对应的措施来保证资源的一致性。常见的解决方案有两种: 对资源加线程锁 采用乐观策略实现无锁操作(CAS) 线程锁的使用: 使用线程锁来保证资源的一致性是由来已久的一种非常简单便捷的方法。这种操作可以粗暴的控制多个线程对资源的访问,所以在处理多线程资源竞争关系的时候,我们通常会优先想到加锁的方式。 为了提高执行性能,通常会采用轻量级锁来代替重量级锁,在 Java 1.5 中 synchronize 是一个重量级锁,是相对低效率的;相比之下使用 Lock 对象的性能更高一些。但是这种情况到了 Java 1.6 发生了很大的变化,由于官方对 synchronize 引入了适应自旋、锁消除、轻量级锁、偏向锁等优化手段, synchronize 与 Lock 在性能上不存在什么差距。所以如果你使用高于 Java 1.6 的版本,请放心大胆的使用 synchronize 。 无锁操作(CAS): 对于传统的加锁操作,我们通常认为是悲观策略。相对于悲观策略,我们还有一个乐观策略可以选择。乐观策略认为不会存在资源不一致的情况,假如出现了,就再试一次。 实际上在 Java 中,一些锁的实现也利用了 CAS,体现在 Java 中的应用如下: 应用领域 示例 java....

Java 程序优化之-如何更好的利用CPU

数据处理中的责任链模式

在我的工作中,数据处理占据了比较大的权重。在数据处理的过程中,有一项比较繁琐的工作,就是对日志中的每个字段进行单独校验和处理,校验的内容大概有以下几类: 字段数量校验 字段为空判断 字段内容校验 特殊字段校验、信息补充 增加标签字段 在这类场景中,最常规的方法就是编写冗长的 if-else 代码段进行按部就班的校验,这显然不是最佳方案。 责任链模式可以有效地解决上述繁琐的工作。通过将不同的校验和处理逻辑分配给不同的处理者,形成一条责任链,数据依次通过各个处理者进行处理。这样做的好处是: 避免编写大量的重复 if-else 代码 将每个校验逻辑进行隔离,区分责任边界 逻辑清晰,代码简洁 以下是一个简单的责任链模式代码示例: public class ChainHandler { private List<Handler> handlers; public ChainHandler() { this.handlers = new ArrayList<>(); } public void addHandler(Handler handler) { handlers.add(handler); } public boolean handle(LogEntry logEntry) { for (Handler handler : handlers) { handler.handle(logEntry); if (!handler.isComplete()) { return false; // 如果处理程序未完成,则整个处理链失败 } } return true; // 如果所有处理程序均完成,则整个处理链成功 } } // 定义责任链接口 public interface Handler { boolean isComplete(); void handle(LogEntry logEntry); } // 字段数量校验 public class FieldCountHandler implements Handler { @Override public boolean isComplete() { // 检查字段数量是否满足要求 return true; } @Override public void handle(LogEntry logEntry) { // 进行字段数量的校验和处理 } } 在上述代码中,ChainHandler 类管理责任链中的处理者。FieldCountHandler 是处理字段数量的实现。...

数据处理中的责任链模式

SpringBoot 中实现订单过期自动取消

在电商等需要在线支付的应用中,通常需要设置订单自动取消的功能。本文将介绍几种在 Spring Boot 中实现订单 30 分钟自动取消的方案,包括定时任务、延迟队列和 Redis 过期事件。 方案一:定时任务 定时任务是一种简单且常用的实现订单自动取消的方案。在 Spring Boot 中,可以使用注解@Scheduled来定义定时任务,任务会按照指定的时间间隔执行。在这个方案中,我们可以定义一个定时任务,每隔 30 分钟检查一次未支付的订单,如果订单生成时间超过 30 分钟,则自动取消该订单。 代码示例: import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @EnableScheduling @Component public class OrderCancelSchedule { @Autowired private OrderService orderService; @Scheduled(cron = "0 0/1 * * *?") public void cancelUnpaidOrders() { List<Order> unpaidOrders = orderService.getUnpaidOrders(); unpaidOrders.forEach(order -> { if (order.getCreationTime().plusMinutes(30).isBefore(LocalDateTime.now())) { orderService.cancelOrder(order.getId()); } }); } } 在上面的代码中,我们定义了一个名为OrderCancelSchedule的组件,并使用@EnableScheduling注解启用定时任务功能。在组件中,我们定义了一个名为cancelUnpaidOrders的方法,并使用@Scheduled注解来指定该方法作为定时任务执行。cron表达式"0 0/1 * * *?"表示任务每隔 1 分钟执行一次。 方案二:延迟队列 延迟队列是一种将任务延迟执行的机制,入队的元素在一定的延迟时间之后才能出队。在这个方案中,我们可以将订单的 ID 放入延迟队列中,并设置延迟时间为 30 分钟。当延迟时间到期时,从队列中取出订单 ID,并执行取消订单的操作。...

SpringBoot 中实现订单过期自动取消

把 Obsidian 变为 Hugo 博客的集成管理平台

今天早上无聊闲逛,看到有不少人是从我的 Obsidian + Hugo 系列,开始了解原来 Obsidian 和 Hugo 还可以这么搭配的。 但是不少人反馈,根据 Hugo 博客写作最佳实践 和 Obsidian + Hugo 最佳配置推荐 这两篇文章操作起来感觉确实方便了,但是方便的不多。整个流程没有非常流畅。甚至感觉有些流程设计的过于繁琐。 不否认,确实是这种感觉,问题在我。 实际上这是我设计的一整套流程,但这两篇博客只讲了关键的一部分,还有一些细枝末节没有讲出来。 所以我临时决定再增加一篇文章说明,把整套流程讲清楚。担心表达能力欠佳,我还录了视频辅助大家理解。 中心思想 整个流程设计的中心思想就是:解决繁琐的操作流程,把操作自动化+高度集中,解放思想,精力都用在写作上。 管理面板 在 Obsidian 的诸多插件的加持下,最终实现所有管理功能集成在一个管理中心,效果如下: 上图中的内容共分为几部分: 统计图,这里只根据发布情况做了发布占比统计,大家完全可以根据自己的需求设计更美观实用的统计图表 操作按钮+笔记:共集成了三个操作按钮,点击新建博客按钮可以根据模板自动创建一篇空白文章;点击发布博客按钮可以把博客内容自动推送到 github,从而触发 github action 自动部署流程;点击获取更新按钮可以从github上同步最新的仓库内容。 草稿箱:使用 dataview 插件,把草稿内容列举出来,归集为草稿箱,直接点击对应草稿便可以开始编辑内容 已发布:把所有已经发布的内容列举出来,方便查看 视频演示 以下视频演示了整个 新建 -> 写作 -> 发布 流程,一刀未剪 技术细节 接下来说一下技术细节。说实话,这里其实并没有太多技术细节,只是一层窗户纸罢了,只要一捅破,大家瞬间就明白了,看到这里应该有很多人已经可以明白实现原理了。 基于 Hugo 博客写作最佳实践 这篇文章,我们可以通过 QuickAdd 插件实现各种自动化命令,在此基础上,只需要增加一个 Buttons 插件,便可以把命令通过点击按钮的方式进行调用。 Buttons 插件安装完成之后,在首页添加如下代码: 新建博客按钮代码 发布博客按钮代码 获取更新按钮代码 总结 以上就是全部内容了,没有什么高深的技术内容,都是一些插件运用的技巧。 有了这个管理面板之后,你是不是可以把精力全部放在写作上了呢?...

把 Obsidian 变为 Hugo 博客的集成管理平台

如果我们想实现一个 WAF之 -- 什么是 WAF

如果我们想实现一个简易的 WAF,我们实际上需要干什么?我们需要先了解什么是 WAF。 🐭 什么是 WAF WAF 全称 Web Application Firewall,是一种工作在应用层(7 层)的防火墙,主要用于对应用层中的 HTTP 流量进行监测、过滤和阻止。主要适用于 Web 应用中存在的已知的安全漏洞,例如:SQL 注入、CSRF 跨站请求伪造攻击、XSS 跨站脚本攻击等。 其主要防护原理是基于规则匹配,通过预制的识别规则,对 HTTP 协议中提取出来的信息进行匹配,如果可以匹配,则该请求被认为是攻击行为。WAF 将会对其执行相关的操作(告警、阻断、记录日志)。 综上所诉,WAF 的基本原理如下: 那么回到问题,什么是 WAF 呢? WAF 是一个工作在应用层,主要针对 HTTP 流量进行解析、检测的装置;其检测功能主要基于规则引擎,通过预制规则,对流量中的相关信息进行匹配,能够针对流量中的 SQL 注入、CSRF、XSS 等 Web 攻击行为进行识别与防护,防护手段主要有告警、阻断、记录日志。 🐮 WAF 通常部署在哪里 从形态上来讲, WAF 主要分为软件型 WAF 和硬件型 WAF 软件型 WAF 主要以嵌入的形式进行部署,一般部署在 Web 服务器中,常见的例如 nginx waf、apache waf、openresty waf 等。 硬件型 WAF 主要通过软件绑定硬件的方式,其部署方式多种多样,以下内容中的 WAF 默认指代硬件 WAF。 WAF 的部署模式通常有: 反向代理、透明代理、透明桥、流量镜像等方式。 反向代理 客户与 WAF 进行交互,WAF 将客户的请求直接转发至后端,后方的 Web Server 与 WAF 进行交互,不会直接暴漏给客户。...

如果我们想实现一个 WAF之 -- 什么是 WAF

盘点那些年我做过的东西

我是一个很容易焦虑的人,大部分焦虑都源自经济压力。 我也是一个很理想主义的人,始终妄想着能够做出一款为大家所认可的产品,随之产生睡后收入。 我还是一个很悲观的人,在做产品的过程中,随着不断的自我反思,会不断陷入:这个东西有人做了;这个东西不会有市场的;这个东西不是一个人能做出来的。等等类似的自我怀疑中。 这些年有不少想法,做了不少工作,但是都胎死腹中,分享出来供大家消遣。我的 Github 签名是 “废材程序员”,真是太贴切了。 treehole-jekyll (一个有点想法的博客系统) 一个包含完整前后端功能的博客系统,使用 Java 开发,采用 sqlite 作为数据库,兼容了 Jekyll 的主题。 部署在小水管云服务器中,后因为优化力度不够+Java对内存的优化确实不是很好,导致小水管只够跑一个博客服务。不能忍受资源浪费,遂放弃自建服务回到 Github Pages 的怀抱。省心省力省钱。 Solid (为博客开发的模板引擎) 在上诉博客系统开发过程中,为了兼容 Jekyll 模板引擎语法,开发的基于 Java 的模板引擎,可以嵌入到 Springboot 中作为视图渲染引擎进行使用。后一直闲置至今。 Hermes (基于 RSS 的文章聚合推荐平台) 有一段时间特别痴迷 RSS,而当时市面上的 RSS 阅读器并没有独角兽的出现,找来找去没找到合适的,所以就种下了 “我要开发一个的蛊”,后来来来回回做了好几个, hermes 算是整体完成度比较高的一个。具备完整的前端+后台+RSS爬虫+任务调度。 还做过一个叫 Miner 的应用,后来也不了了之了。 专注了吗小程序 前段时间逛论坛的时候,发现微信小程序个人认证的费用降到了 30 块,本着宁可不用不能没有的原则赶紧充值上车了。 又本着空着也是空着,总要放点什么的原则,做了这款专注了吗的小程序。主打任务专注管理。 AI 套壳工具 AI 突然就火了,套壳工具如雨后春笋般冒出来,做肯定要做一个的,但是由于聊天回应特别慢,接口延迟的问题一直没法解决,所以一直也没进行推广。想用的可以联系我在后台免费加套餐。花钱暂时就免了吧。 cockroach2 (开源 java 爬虫框架) 有一段时间,对爬虫特别痴迷,而且正好那段时间在研究 Java 的对象管理,索性整合一下做了一个爬虫框架。主打用尽可能少的配置、写出一个灵活、健壮的爬虫。当时在第一个版本出来的时候还许下豪言,要做真正的分布式爬虫,而不仅仅是队列+爬虫;要集成分布式任务、分布式事务等等,后续由于工作繁忙也渐渐的都放下了。目前任然是一个小巧、灵活、健壮、可观测的爬虫框架。 ultraman-rpc (一个练手的RPC项目) 单纯练手项目,实现了基于接口的 RPC 调用。 fas-cloud (faas 平台) 云,是一个很迷人的概念。曾经我也立志在云上创出一片天。但是最终还是没有完成。 整体项目完成了 50% 左右,做了 Function 函数管理功能、Function模板管理功能、Function 执行以及动态管理功能,很遗憾没有做完,...

盘点那些年我做过的东西

探索 Kafka 消息丢失的问题和解决方案

在构建基于 Kafka 的消息处理系统中,消息丢失是一个需要深入研究的重要问题。强大的系统不仅依赖于其功能,而且依赖于其可靠性。因此,理解消息丢失的原因,并采取必要的措施确保消息的一致性和完整性,是构建高效可靠消息系统的重要组成部分。本文将详细分析 Kafka 消息丢失的主要原因,并提供一系列策略来解决这个问题。 消息丢失的原因 生产者端问题: 在 Kafka 系统中,生产者负责发送消息。然而,由于网络故障或其他未知问题,生产者可能无法成功发送消息到 Kafka 服务器。 Kafka 服务端问题: Kafka 服务器可能会因为硬件故障、磁盘满或其他异常情况导致消息丢失。 消费者端问题: 消费者负责处理接收到的消息。但是,消费者在处理消息时可能会出现错误或崩溃,导致消息未被正确处理。 解决方案与措施 生产者端相关方案与措施 发送消息处理回调方法 由于消息的常规发送采用的异步方式,所以通常会忽略掉回调处理,为了保证消息的发送质量,一定需要对回调信息进行处理或者改为同步发送。 producer.send(new ProducerRecord<>(topic, messageKey, messageStr), new CallBack({...}); 设置有效的重试策略以及 acks 配置 我们可以在生产者端设置一个有效的重试策略,保证消息成功发送。例如,我们可以使用指数退避算法进行重试。这种算法会在每次重试失败后等待更长的时间,从而减轻服务器的压力,并增加消息成功发送的概率。 通过设置 Producer acks 机制,我们可以确保生产者收到 Kafka 服务器的确认,知晓消息是否被成功提交。 acks=0: 生产者在发送消息后不会等待任何确认,直接将消息视为发送成功。这种设置下,可能会出现消息丢失的情况,因为生产者不会等待服务器的任何确认即认为消息发送成功。 acks=1: 生产者在发送消息后会等待 Leader Broker 的确认,确认后即视为消息发送成功。这种设置下,消息的可靠性得到一定程度的保证,但仍有可能发生 Leader Broker 宕机导致消息丢失的情况。 acks=all: 生产者在发送消息后会等待 Leader Broker 和所有副本的确认,确认后才视为消息发送成功。这种设置下,消息的可靠性和一致性得到最高级别的保证,但同时也会增加网络延迟和资源消耗。 import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class KafkaProducerExample { private static final String TOPIC_NAME = "my-topic"; private static final String BOOTSTRAP_SERVERS = "localhost:9092"; public static void main(String[] args) { Properties props = new Properties(); props....

探索 Kafka 消息丢失的问题和解决方案