SNI是一个TLS的扩展字段,经常用于访问域名跳转到不同的后端地址。

配置方式如下:打开nginx.conf文件,以ttbb/nginx:nake镜像为例/usr/local/openresty/nginx/conf/nginx.conf

如下为默认的nginx.conf配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

#user nobody;
worker_processes 1;

#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;

#pid logs/nginx.pid;


events {
worker_connections 1024;
}


http {
include mime.types;
default_type application/octet-stream;

#log_format main '$remote_addr - $remote_user [$time_local] "$request" '
# '$status $body_bytes_sent "$http_referer" '
# '"$http_user_agent" "$http_x_forwarded_for"';

#access_log logs/access.log main;

sendfile on;
#tcp_nopush on;

#keepalive_timeout 0;
keepalive_timeout 65;

#gzip on;

server {
listen 80;
server_name localhost;

#charset koi8-r;

#access_log logs/host.access.log main;

location / {
root html;
index index.html index.htm;
}

#error_page 404 /404.html;

# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}

# proxy the PHP scripts to Apache listening on 127.0.0.1:80
#
#location ~ \.php$ {
# proxy_pass http://127.0.0.1;
#}

# pass the PHP scripts to FastCGI server listening on 127.0.0.1:9000
#
#location ~ \.php$ {
# root html;
# fastcgi_pass 127.0.0.1:9000;
# fastcgi_index index.php;
# fastcgi_param SCRIPT_FILENAME /scripts$fastcgi_script_name;
# include fastcgi_params;
#}

# deny access to .htaccess files, if Apache's document root
# concurs with nginx's one
#
#location ~ /\.ht {
# deny all;
#}
}


# another virtual host using mix of IP-, name-, and port-based configuration
#
#server {
# listen 8000;
# listen somename:8080;
# server_name somename alias another.alias;

# location / {
# root html;
# index index.html index.htm;
# }
#}


# HTTPS server
#
#server {
# listen 443 ssl;
# server_name localhost;

# ssl_certificate cert.pem;
# ssl_certificate_key cert.key;

# ssl_session_cache shared:SSL:1m;
# ssl_session_timeout 5m;

# ssl_ciphers HIGH:!aNULL:!MD5;
# ssl_prefer_server_ciphers on;

# location / {
# root html;
# index index.html index.htm;
# }
#}

}

在最后面添加上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
stream {

map $ssl_preread_server_name $name {
backend.example.com backend;
default backend2;
}

upstream backend {
server 192.168.0.3:12345;
server 192.168.0.4:12345;
}

upstream backend2 {
server 127.0.0.1:8071;
}

server {
listen 12346;
proxy_pass $name;
ssl_preread on;
}
}

这个时候,我们已经开启了SNI转发的功能,如果你使用backend.example.com的域名访问服务器,就会转发到backend,如果使用其他域名,就会转发到backend2

测试的时候,让我们在/etc/hosts里进行设置,添加

1
127.0.0.1 backend.example.com

然后进行请求

1
curl https://backend.example.com:12346

这里注意请求要使用https,http协议或者是tcp可没有SNI的说法

nginx-sni-backend

发现请求的确实是backend

然后测试请求127.0.0.1:12346

1
curl https://127.0.0.1:12346

nginx-sni-127

准备工作

运行zookeeper

1
docker run -p 2181:2181 -d ttbb/zookeeper:stand-alone

代码参考

1
https://github.com/hezhangjian/maven-demo/tree/master/demo-zookeeper/src/main/java/com/github/hezhangjian/demo/zookeeper

第一次运行代码

创建一个临时有序Znode,程序维持一个小时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.github.hezhangjian.demo.zookeeper;

import com.github.hezhangjian.javatool.util.CommonUtil;
import com.github.hezhangjian.javatool.util.LogUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;

import java.util.concurrent.TimeUnit;

/**
* @author hezhangjian
*/
@Slf4j
public class TempOrderTest {

public static void main(String[] args) throws Exception {
LogUtil.configureLog();
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
client.start();
String path = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/seq", "World".getBytes());
log.info("path is [{}]", path);
CommonUtil.sleep(TimeUnit.HOURS, 1);
}

}

运行第一次

first-run

运行完之后zk

after-first-run

第二次运行代码

创建一个临时有序Znode,创建完成后立刻关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.github.hezhangjian.demo.zookeeper;

import com.github.hezhangjian.javatool.util.CommonUtil;
import com.github.hezhangjian.javatool.util.LogUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;

import java.util.concurrent.TimeUnit;

/**
* @author hezhangjian
*/
@Slf4j
public class TempOrderTest2 {

public static void main(String[] args) throws Exception {
LogUtil.configureLog();
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181")
.sessionTimeoutMs(10000).retryPolicy(retryPolicy).build();
client.start();
String path = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/temp/seq", "World".getBytes());
log.info("path is [{}]", path);
client.close();
}

}

运行第二次结果

second-run

运行后zk

after-second-run
因为是临时节点,所以存在一会后删除

第三次仍然运行第一次的代码

运行第三次结果

third-run

zk

after-third-run

第四次,修改了zkPath的值。值得一提的是,同一个ZkPath下似乎共享临时有序节点的最大值。如果修改zkPath从/temp/seq到/temp/seqX,出现的并不是预估的0000,而是0003

fourth-run
也就是意味着,zk的临时node序号添加是根据父目录下一个标志计数的

zk

after-fourth-run

Hash-based multipath routing

该特性在Linux4.4版本引入,一个难以被大家发现的好处是,基于源地址路由,没有做地址转换,并不会在nf_conntrack中添加记录
它是对源IP和目标IP进行哈希处理(端口不参与哈希的计算)计算选路。配置的命令如下:
weight代表权重

通过网关负载均衡

1
2
3
ip route add default  proto static scope global \
nexthop via <gw_1> weight 1 \
nexthop via <gw_2> weight 1

通过网卡负载均衡

1
2
3
ip route add default  proto static scope global \
nexthop dev <if_1> weight 1 \
nexthop dev <if_2> weight 1

为什么我们想要业务主键,想要幂等

​ 在分布式微服务场景下,有太多的环节可以引发错误的处理(包括丢失或者重复处理),如果业务本身有幂等的特性,我们可以以较低的代价解决大部分问题。

​ 我们假设我们在k8s集群中维护着下面的系统,部署了数个网关实例,数个业务处理服务,一套Kafka集群,数个消费者服务,一个数据库,平时的业务流程是这样子的:

1
客户--->(step1) 网关--->(step2) 业务处理服务-->(step3)消息中间件如Kafka-->(step4)消费者消费-->(step5) 写入数据库

而我们希望做到的是,在海量数据量的情况下,仅仅付出较小的代价,使得客户的每一条消息都存入到我们的数据库,没有丢失或者重复。

在分布式的环境下,有很多地方都会出错,会引发消息的丢失或者重复,我们先假设上述系统没有做太多的可靠性加固,是如下工作的:

1
2
3
4
5
1. 客户简单地发送到网关
2. 网关发送到后端处理服务
3. 后端处理服务接收到请求,做简单校验后直接返回成功/失败,异步发送消息到Kafka
4. 消费者从Kafka拉取消息,拉取到消息就提交,不考虑写入数据库的成功或失败
5. 写入到数据库

举几个例子,那么我们会碰到类似这样的问题:

1
2
3
1. 客户发送到网关不考虑结果,即时我们返回失败,也不处理。显然会丢失消息,我们的系统并不能保证100%的消息处理成功
2. 网关在与业务处理服务的TCP的四次挥手阶段处理异常,导致业务处理成功(已发送到Kafka),但实际返回客户失败。导致消息重复
3. 消费者拉取到消息后,写入数据库失败,但此时offset已提交。消息丢失

我们先不考虑客户,先看平台侧,我们构筑了一个什么样的系统呢?

它既不能保证至少发一次(AT_LEAST_ONCE),也不能保证最多发一次(AT_MOST_ONCE),我们构筑了一个即可能多发也有可能少发的系统,当然,现在的基础设施很好,照着这样跑,可能丢失的数据也就是万中一二,但我们做技术,还是要有点追求,实现别人难以实现的事情才是我们的竞争力。

平台如何实现精确一次

实现端到端一次的技术核心是两阶段提交,以上述业务流程为例,实现了两阶段提交的流程应该是这样的:

1
2
3
4
1. 消费者从Kafka消费到一批数据,但并不commit提交
2. 消费者向数据库prepare这批数据
3. 消费者向Kafka提交Offset
4. 消费者向数据库commit这批数据

按照这个流程,还会有一些异常场景,比如

1
2
1.消费者向Kafka提交Offset后,突然宕机,重启的消费者无法恢复事务,消息丢失
2.消费者commit失败,消息丢失

第二种情况属于极端场景,因为我们执行的业务简单,只是insert操作,prepare成功,commit失败,可以打日志,报告警人工处理。但第一种情况是需要自动化解决的,因为我们不能对每条提交失败的事务都人工处理。那么我们需要的是:

我们需要消费者在进程级失败的时候,可以判断处于prepare阶段的事务是否需要恢复,这个可以对比Mysql,Mysql也使用了两阶段提交协议,每次重启的时候会判断redolog处于prepare状态,binlog是否完整,如果binlog完整,则恢复。这里redo log就像我们的数据库,binlog就是kafka。但我们现在的业务没有主键,每条消息都是独立的,我们无法区分,那些是要被正常放弃的事务,那些是重启的时候需要恢复的事务。这种情况我们无法处理。

但如果这时候我们有了主键,我们有了幂等,我们只要做到至少一次就可以保证平台达到端到端一次。因为多次向数据库插入相同的数据,并不会发生什么事。

平台如何实现至少一次

实现至少一次的核心技术是如果处理不成功就打死不提交,报告警等人工操作都不提交! 其他队列,缓存区,滑窗只不过是提升性能的手段。

1
2
3
4
1. 消费者从Kafka消费到一批数据,但并不commit提交
2. 消费者向数据库prepare这批数据
3. 消费者向数据库commit这批数据
4. 消费者向Kafka提交Offset

接下来我们来看与客户的通信,这次我们先看如何做到至少一次

与客户侧通信如何做到至少一次

前面说到客户可能不处理你的返回值,碰到这样的客户其实是你赚了,客户的系统连这个都不重视,那想必也不会在意你是否做到了端到端一次吧,丢失了数据想必也不是特别在意。一个端到端一次的系统,一定需要输入端和输出端的配合,正如我前面举例:

1
客户发送到网关不考虑结果,即时我们返回失败,也不处理。显然会丢失消息,

那么我们也需要客户的配合,客户检测到我们回复的结果是失败,重试一下,确保成功。现在我们已经实现了至少一次了吗?

没有,还记得我前面说的这个吗?

1
3. 后端处理服务接收到请求,做简单校验后直接返回成功/失败,异步发送消息到Kafka

我们需要后端处理服务接收到请求后,确保发送Kafka成功,再回复用户成功

与客户侧通信做到精确一次

假设前面的方案我都已经做了,在什么情况下我们会重复呢?

1
2
3
1. 网关在与业务处理服务的TCP的四次挥手阶段处理异常,导致业务处理成功(已发送到Kafka),但实际返回客户失败。导致消息重复
2. 业务处理服务与网关的TCP的四次挥手阶段处理异常,导致业务处理成功,但实际返回客户失败
3. 客户的系统重启,成功应答并没有成功传达

为了实现两阶段提交协议,客户在发送前需要确认这个消息是否已经处理过了,但是没有主键,我们无法提供给客户这样子的信息。(如果考虑性能的话,整个系统端到端的事务,基本与高吞吐无缘了)
而且,两阶段提交协议也需要客户做很多的工作,实际中也很难落地。

总结

  • 我们需要一个业务上的主键,它可以是组合主键(mysql, mongo),或者是single主键(更适合cassandra和redis),使得我们可以提供更高QOS的保证,为此,仅需付出极小的代码,可能仅仅是数据库的主键一致性检查。
  • 如果系统和业务无关,任谁也难言真正的端到端一次。Flink无疑是流系统里面端到端一次的佼佼者,但上面也有着诸多限制。

备注

  • 事实上,要实现精确一次,系统的每两个环节之间都要做两阶段提交,为行文方便,省去网关,业务处理服务之间的两阶段提交
  • 推荐书籍 《基于Apache Flink的流处理》

灵活部署要求

常见的微服务开发下,可能会对微服务的日志打印有一些要求.常见的一种模式是,将日志文件挂载在主机路径中,然后在主机上启动filebeat收集日志.

以我的一个demo工程rabbitmq-adapt为例:

1
2
容器内打印路径: /opt/sh/logs/file.log
容器外挂载路径: /opt/log/rabbitmq/file.log

但是这样子,如果我们需要在一个虚拟机上跑两个rabbitmq-adapt的时候,日志路径会重复,这种情况下,不仅我们在虚拟机很难定位问题,而且filebeat也很难给日志标记上不同实例的标签.

所以首先,我们得把日志路径按pod实例的方式隔离,选择k8s中HOSTNAME环境变量作为文件路径前缀是一个不错的选择

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
akka@AkkadeMacBook-Pro ~ % kubectl exec -it rabbitmq-58c5f4ff6b-zthg4 bash
[root@rabbitmq-58c5f4ff6b-zthg4 /]# env
LANG=en_US.UTF-8
HOSTNAME=rabbitmq-58c5f4ff6b-zthg4
KUBERNETES_PORT_443_TCP_PROTO=tcp
KUBERNETES_PORT_443_TCP_ADDR=10.96.0.1
KUBERNETES_PORT=tcp://10.96.0.1:443
POD_NAME=rabbitmq-58c5f4ff6b-zthg4
PWD=/
HOME=/root
NODE_NAME=minikube
KUBERNETES_SERVICE_PORT_HTTPS=443
KUBERNETES_PORT_443_TCP_PORT=443
KUBERNETES_PORT_443_TCP=tcp://10.96.0.1:443
TERM=xterm
SHLVL=1
KUBERNETES_SERVICE_PORT=443
PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
POD_IP=172.17.0.2
KUBERNETES_SERVICE_HOST=10.96.0.1
LESSOPEN=||/usr/bin/lesspipe.sh %s
_=/usr/bin/env

k8s subPathExpr方案: 查阅资料,官方推荐的是这个方案

这个时候,你的映射关系是 /opt/sh/logs <==> /opt/log/rabbitmq/${POD_NAME}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
apiVersion: apps/v1
kind: Deployment
metadata:
name: rabbitmq
labels:
app: rabbitmq
spec:
replicas: 2
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
containers:
- name: rabbitmq
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
image: hezhangjian/rabbitmq-adapt:0.0.1
readinessProbe:
httpGet:
path: /readiness
port: 8083
initialDelaySeconds: 3
periodSeconds: 3
volumeMounts:
- mountPath: /opt/sh/log
name: rabbitmq-log
# subPath的方案很好,但对版本号要求很高,>=1.14
subPathExpr: $(POD_NAME)
resources:
limits:
memory: 4G
cpu: 1000m
requests:
memory: 500M
cpu: 250m
securityContext:
privileged: true
volumes:
# - name: rabbitmq-data
# hostPath:
# path: "/Users/akka/rabbitmq"
# type: DirectoryOrCreate
- name: rabbitmq-log
hostPath:
path: /opt/log/rabbitmq
type: DirectoryOrCreate


可以达到如下的效果:

1
minikube/                  rabbitmq-6df8f7565c-kh2h6/ rabbitmq-6df8f7565c-ss92l/

log4j2环境变量隔离方案:适用于在你的k8s版本还不够的情况下

这个时候,你的文件映射关系还是 /opt/sh/logs <==> /opt/log/rabbitmq

但是真正在打印日志的时候,把日志都打印到/opt/sh/logs/${POD_NAME}下,这样子也可以在一台vm上跑多个实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<?xml version="1.0" encoding="UTF-8" ?>

<Configuration status="warn" monitorInterval="10">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{yyyy-MM-dd,HH:mm:ss,SSSXXX}(%r):%4p%X[%t#%T]%l-->%m%n"/>
</Console>
<File name="FILE" fileName="${env:HOSTNAME}/file.log">
<PatternLayout pattern="%d{yyyy-MM-dd,HH:mm:ss,SSSXXX}(%r):%4p%X[%t#%T]%l-->%m%n"/>
</File>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
<AppenderRef ref="FILE"/>
</Root>
</Loggers>
</Configuration>

我们解决了两个容器在一个vm上打印日志的问题,紧接着,我们要分析filebeat的能力,filebeat能否区分这两个路径,把这两个路径打上不同的标签

运行并配置好filebeat,配置文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
filebeat.inputs:
- type: log
enabled: true
paths:
- /opt/sh/collect/log/es/*/*.log
tags: ["es"]
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
reload.period: 10s
setup.template.settings:
index.number_of_shards: 1
fields:
fields_under_root: true
setup.kibana:
output.elasticsearch:
hosts: ["localhost:9200"]
processors:
- add_host_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata:
kube_config: /opt/sh/collect/log/config
logging.level: debug

,查询收集上来的数据

1
2
curl 127.0.0.1:9200/filebeat-7.5.1-2020.01.23-000001/_search?pretty
收集上来的数据存在hostname字段,能区分代表单个实例的信息.

使用ES processor添加实例id信息

1
2
3
4
5
6
7
8
9
10
11
12
curl -X PUT "localhost:9200/_ingest/pipeline/attach_instance?pretty" -H 'Content-Type: application/json' -d'
{
"processors": [
{
"grok": {
"field": "log.file.path",
"patterns": ["/opt/sh/collect/log/es/%{WORD:instanceId}/es.log"]
}
}
]
}
'

然后修改filebeat配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
filebeat.inputs:
- type: log
enabled: true
paths:
- /opt/sh/collect/log/es/*/*.log
tags: ["es"]
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
reload.period: 10s
setup.template.settings:
index.number_of_shards: 1
fields:
fields_under_root: true
setup.kibana:
output.elasticsearch:
hosts: ["localhost:9200"]
pipeline: attach_instance
processors:
- add_host_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata:
kube_config: /opt/sh/collect/log/config
logging.level: debug

查询结果就会出现在instanceId字段

总结

三种方案均可以方便地实现同一vm上部署两个容器. 方案一只需要修改tosca模板,但要1.14版本才支持.方案二需要修改少量代码.

但前两种均不适合配置了hostnetwork,即独占主机的网络,原因,主机网络独占之后,两个容器的hostname都相同,实际中,已经独占网络的容器,还需要部署在一个节点的需求,应该比较少.

如果有,可以使用在es处处理文件路径,加上instanceId字段

业务需求分析与解决方案

在业务场景中,当需要利用ping命令对主机进行心跳探测时,直接在代码中fork进程执行ping命令虽然可行,但这种方法开销较大,并且处理流程易出错,与使用标准库相比缺乏优雅性。因此,本文探讨了使用Java的InetAddress类的isReachable方法作为替代方案。

根据资料指出,Java的InetAddress类在root用户权限下通过执行ping命令进行探测,在非root用户权限下则通过访问TCP端口7进行探测。为验证这一点,本文撰写了相应的demo代码并进行了测试(详见:GitHub - heart-beat)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import lombok.extern.slf4j.Slf4j;

import java.net.InetAddress;

@Slf4j
public class PingTestMain {

public static void main(String[] args) throws Exception {
String testIp = System.getProperty("TestIp");
InetAddress inetAddress = InetAddress.getByName(testIp);
boolean addressReachable = inetAddress.isReachable(500);
log.info("address is reachable is {}", addressReachable);
}

}

测试实验

root用户下执行程序

java-ping-root-success.png

java程序打印结果也是true

在普通用户权限下的测试

java-ping-user-fail.png

此时可以看到,我们的客户端程序向目标tcp7端口发送了一个报文,虽然java程序打印结果为true,但是因为收到了RST包导致的.在当今的网络安全要求下,7端口往往不会开放

在目标网络屏蔽TCP端口7的情况下执行程序

1
iptables -A INPUT -p tcp --dport 7 -j DROP

发送的报文没有收到RST包,此时java程序返回false.不是我们预期的结果

普通用户权限下携带特权的测试

进一步的研究发现,Java程序发送ping命令需要创建raw socket,这要求程序具有root权限或cap_net_raw权限。赋予Java程序创建raw socket的权限后重新测试,发现程序能够正确发送ping命令,达到预期效果。

1
2
setcap cap_net_raw+ep /usr/java/jdk-13.0.1/bin/java

发现如下报错

1
java: error while loading shared libraries: libjli.so: cannot open shared object file: No such file or directory

使用https://askubuntu.com/questions/334365/how-to-add-a-directory-to-linker-command-line-in-linux规避添加so文件权限

随后抓包,发现还是发送了ping命令,达到了我们预期的效果

总结

本文通过一系列测试得出结论,root用户权限下的Java程序会使用ping命令进行探测。若普通用户不具备相应权限,则会尝试探测TCP端口7,但在安全组未开启该端口的情况下会导致预期结果不一致。推荐赋予java程序特权,使得InetAddress类能够使用ping命令进行探测

如何使用显示过滤器

wireshark-display-filter1
或者按住 CTRL + F,输入显示过滤器
wireshark-display-filter2

二层显示过滤器举例

长度小于128字节的数据包

frame.len<=128

排除ARP流量

!arp

三层显示过滤器举例

只显示192.168.0.1 IP相关数据包

ip.addr==192.168.0.1

四层显示过滤器举例

排除RDP流量

!tcp.port==3389

具有SYN标志的TCP数据包

tcp.flags.syn==1

具有RST标志的TCP数据包

tcp.flags.rst==1

TCP确认时间较久

tcp.analysis.ack_rtt > 0.2 and tcp.len == 0
###启用TCP Relative Sequence Number的情况
如何启用?
Edit -> Preferences -> Protocols -> TCP Relative Sequence Numbers

握手被对方拒绝的包

tcp.flags.reset == 1 && tcp.seq == 1

客户端重传

tcp.flags.syn == 1 && tcp.analysis.retransmission

Tcp包含

tcp contains {str}

应用层显示过滤器举例

所有http流量

http

文本管理流量

tcp.port == 23 || tcp.port == 21

文本email流量

email || pop || imap

只显示访问某指定主机名的HTTP协议数据包

http.host == <”hostname”>

只显示包含HTTP GET方法的HTTP协议数据包

http.request.method == ‘GET’

只显示HTTP 客户端发起的包含指定URI请求的HTTP协议数据包

http.request.uri == <”Full request URI”>

只显示包含ZIP文件的数据包

http matches “.zip” && http.request.method == ‘GET’

如何使用捕获过滤器

点击捕获,选项,然后在所选择的捕获过滤器上输入对应的捕获表达式

wireshark-capture-filter1

wireshark-capture-filter2

抓包过滤器

  • type(类型) 限定符: 比如host,net,port限定符等
  • dir(方向) 限定符: src dst
  • Proto(协议类型)限定符: ether ip arp

二层过滤器举例

1
2
3
4
5
6
7
8
tcp dst port 135 //tcp协议,目标端口为135的数据包
ether host <Ethernet host> //让wireshark只抓取这个地址相关的以太网帧
ether dst <Ethernet host>
ether src <Ethernet src>
ether broadcast //Wireshark只抓取所有以太网广播流量
ether multicast //只抓取多播流量
ether proto <protocol>
vlan <vlan_id>

三层过滤器举例

1
2
3
4
5
6
7
8
ip #只抓取ipv4流量
ipv6
host 10.0.0.2
dest host <host>
src host <host>
broadcast #ip广播包
multicast #ip多播包
ip proto <protocol code> #ip数据包有多种类型,比如TCP(6), UDP(17) ICMP(1)

只抓取源于或者发往IPv6 2001::/16的数据包

net 2001::/16

只抓取ICMP流量

ip proto 1

只抓取ICMP echo request流量

icmp[icmptype]==icmp-echo
icmp[icmptype]==8

只抓取特定长度的IP数据包

ip[2:2] ==

只抓取具有特定TTL的IP数据包

ip[8] ==

抓取数据包的源和目的IP地址相同

ip[12:4] ==1 ip[16:4]

四层抓包过滤器举例

1
2
3
4
port <port>
dst port <port>
src port <port>
tcp portrange <p1>-<p2>

只抓取TCP中SYN或者FIN的数据包

tcp [tcpflags] & (tcp-syn | tcp-fin) != 0

只抓所有RST标记位置为1的TCP数据包

tcp[tcpflags] & (tcp-rst) != 0

tcp头部的常用标记位

  • SYN: 用来表示打开连接
  • FIN: 用来表示拆除连接
  • ACK: 用来确认收到的数据
  • RST: 用来表示立刻拆除连接
  • PSH: 用来表示应将数据提交给末端应用程序处理

抓取所有标记位都未置1的TCP流量

该报文可能用于端口探测,即如果
tcp[13] & 0x00 = 0

设置了URG位的TCP数据包

URG位,表示该数据包十分紧急,不进入缓冲区,直接送给进程
tcp[13] & 32 == 32

设置了ACK位的TCP数据包

tcp[13] & 16 == 16

设置了PSH位的TCP数据包

PSH代表这个消息要从缓冲区立刻发送给应用程序
tcp[13] & 8 == 8

设置了RST位的TCP数据包

tcp[13] & 4 == 4

设置了SYN位的TCP数据包

tcp[13] & 2 == 2

设置了FIN位的TCP数据包

tcp[13] & 1 == 1

TCP SYN-ACK数据包

tcp[13] == 18

抓取目的端口范围的数据包

tcp portrange 2000-2500

###tcpdump捕获过滤器

常见命令介绍

1
tcpdump -w hzj.pcap -s0 -iany port 1028

上面的命令代表
-w hzj.pcap 存储在hzj.pcap这个文件中
-s 0 代表抓取字节数不限制,在大多数linux系统下,默认捕获每个帧的前96个字节

tcpdump捕获一定范围的端口(9200-9400)

tcpdump portrange 9200-9400

tcpdump -r 可以阅读捕获的文件(建议拷贝到wireshark中分析)

WireShark安装

wireshark在windows和mac上的安装方式都比较简单,下面是Linux下的安装方式

1
2
3
4
5
sudo apt-add-repository ppa:wireshark-dev/stable
sudo apt-get update
sudo apt-get install wireshark
#以root权限启动
sudo wireshark

WireShark的名字解析

wireshark-name-resolve

  • L2层的名字解析,对Mac地址进行解析,返回机器名
  • L3层 ip解析为域名
  • L4层 端口号解析为协议端口号

Wireshark抓到的包更改时间格式

wireshark-time-format

查看EndPoint

点击Statistics->EndPoints,可以查看每一个捕获文件里的每个端点

wireshark-endpoint

查看网络会话

Statistics->Conversations. 查看地址A和地址B,以及每个设备发送或收到的数据包和字节数

wireshark-conversation

基于协议分层结构的统计数据

Statistics->Protocol Hierarchy

wireshark-protocol-hierarchy

跟随流功能

右键选中一个数据包,然后右键,follow。比如我在这里跟随一个tcp流

wireshark-tcp-stream

//这里也可以使用decode as解码功能,但是没有例子,暂不附图

查看IO图

Statistics->IO Graphs

wireshark-io-graph

双向时间图

Statistics->TCP Stream Graph -> Round Trip Time Graph
wireshark-rtt-graph

数据流图

Statistics->Flow Graph
wireshark-flow-graph

专家信息

Analyze->Expert Info Composite
wireshark-expert-info

触发的专家信息

对话消息

窗口更新 由接收者发送,用来通知发送者TCP接收窗口的大小已被改变

注意消息

TCP重传输 数据包丢失的结果,发生在收到重复的ACK,或者数据包的重传输计时器超时的时候

重复ACK 当一台主机没有收到下一个期望序列的数据包时,它会生成最近收到一次数据的重复ACK

零窗口探查ACK 用来响应零窗口探查数据包

窗口已满 用来通知传输主机及其接收者的TCP接收窗口已满

警告消息

上一段丢失 指明数据包丢失,发生在当数据流中一个期望的序列号被跳过时。

收到丢失数据包的ACK 发生在当一个数据包已经确认丢失但受到了其ACK数据包时

保活 当一个连接的保活数据包出现时触发

零窗口 当接收方已经达到TCP接收窗口大小时,发出一个零窗口通知,要求发送方停止传输数据

乱序 当数据包被乱序接收时,会利用序列号进行检测

快速重传输 一次重传会在收到一个重复ACK的20ms内进行

WireShark性能

Statistics -> Summary 查看平均速度

Analyze -> Expert Infos

Statistics -> TCP StreamGraph -> TCP Sequence Graph(Stenens)

TCP Previous segment not captured

在TCP传输过程中,同一台主机发出的数据段应该是连续的,即后一个包的Seq号等于前一个包的Seq + Len. 如果在网络包中没有找到,就会出现这个错误

TCP ACKed unseen segment

Wireshark发现被Ack的那个包没被wireshark捕获

TCP Out-of-Order

在TCP传输过程中,同一台主机发出的数据段应该是连续的,即后一个包的Seq号等于前一个包的Seq +
Len.当Wireshark发现后一个包的Seq号小于前一个包的Seq+Len 就乱序le

TCP Dup ACK

当乱序或者丢包的时候,接收方会收到Seq号比期望值大的包,每收到一个这种包就会Ack一次期望的Seq值

TCP Fast Retransmission

当发送方收到3个或以上TCP Dup ACK,就意识到之前发的包可能丢了,触发快速重传

TCP Retransmission

没有触发tcp超时重传,超时重传

TCP zerowindow

缓存区已满,不能再接收数据了

TCP window FUll

Wireshark检测到,发送方发送的数据会把接收方的接收窗口耗尽

NIO 主要概念介绍

1.Buffers(缓冲区)
Buffer类是常规JAVA类和channels(管道)之间的通道。Buffer包含固定长度的数组中,数组中存放着原始数据,封装在一个包含状态的对象中。管道可以消耗缓冲区的数据,也可以向缓冲区中存入数据。此外,还有一种特殊类型的缓冲区,用于内存映射文件。

2.Channels(通道)
NIO中引入的最抽象的概念就是Channel。通道对象代表着通信连接。通信连接可以是单向的,也可以是双向的。可以理解成缓冲区和IO设备之间的道路。

一些java.io中的旧类也可以使用channel。为了和使用通道的文件,socket连接,或多或少地添加了新方法

大多数通道可以工作在非块模式下,有很好的可扩展性,尤其是在和Selectors(选择器)同时使用的时候。

3.File locking and memory-mapped files(文件锁和内存映射文件)
FileChannel对象提供许多面向文件的功能。比如文件锁这个进程之间处理数据必不可少的功能。把内存映射到文件,在你看来,文件就像是在内存之上一样,省去了把文件拷贝到内存的操作。

4.Sockets
SocketChannel 提供了一种和网络套接字新的交互方法,SocketChannel可以工作在非阻塞模式可以用来和Selectors一起工作。作为结果,许多scoket可以多路传输,比起java.net里的传统socket类更高效

5.Selectors
Selectors提供准备就绪选择(查看channel是否准备就绪)。也可以确定你感兴趣的Channel。通过使用Selector,大量的活动中的I/O channel 可以被一个线程简单有效地监控操作。

6.Character sets(字符集)
提供了多种从byte流映射到字符的方式。

Buffer

Buffer即缓冲区,是包含有一定量数据的容器。Buffer的工作和Channel有着紧密的联系。Channel是I/O的出入口,buffer就是IO的来源或者目标。
要向外传输数据,把数据存放在buffer中交给channel
要接受数据,提供buffer让channel写入。

Buffer的类图.jpg

Capacity
Buffer可以包含的最大字节数。当Buffer创建的时候,容量被设置而且不可更改

Limit
Buffer中的第一个元素,这个元素不应该被读或者是被写。包含着在buffer中“存活”的数据个数

Position
下一个被操作的数据的位置。put和get操作会更新position的位置

Mark
一个有记忆的位置,设置了mark,之后可以回退到标记点,重新进行操作。

新创建一个大小为10的Buffer,概念中可以理解成是这样的

Buffer的类图.jpg

下面是Buffer的方法签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class Buffer{
public final int capacity()
public final int position()
public final Buffer position (int newPosition)
public final int limit ()
public final Buffer limit(int newLimit)
public final Buffer mark()
public final Buffer reset()
public final Buffer clear()
public final Buffer flip()
public final Buffer rewind()
public final int remaining()
public final boolean hasRemaining();
public abstract boolean isReadOnly();
}

你可能会觉得有一些方法应该返回空值,但是却返回了buffer对象,这是为了构建一个流式的API。比如像这样的代码

1
2
3
4
5
buffer.mark();
buffer.position(5);
buffer.reset()
//就可以写成下面这样
buffer.mark().position(5).reset();

所有的buffer都是可读的,但并不是所有的buffer都支持写操作,通过isReadOnly方法判断是否可以写,如果你试图向不支持写操作的buffer中写入数据,会抛出ReadOnlyBufferException异常。

Buffer管理着给定数目的数据元素。但是在大多数情况下,我们只关心其中的一部分数据。就像是,当我们想要向池子里倒水时,水盆里的水,只盛了一半。但我们需要追踪一些信息,buffer中已经有的数据,下一个处理哪个数据?position属性负责处理这个信息。当处理put(向buffer中写入数据)或者get方法(从buffer中获取数据)的时候,position就会更新。

尽管在JAVA NIO BUFFER(一)中,列表中没有put和get方法。但是每个Buffer都包含这两个方法,因为每个buffer的实现不同,需要处理不同的数据类型,没法被声明为抽象方法(nio在jdk1.4被引入,泛型是1.5)。我们用ByteBuffer作为例子,来看这两个方法的声明

1
2
3
4
5
6
7
public abstract class ByteBuffer extends Buffer implements Comparable {
//列出了一部分api
public abstract byte get();
public abstract byte get(int index);
public abstract ByteBuffer put(byte b);
public abstract ByteBuffer put(int index, byte b);
}

如果不指定具体的位置,put和get就会操作在现有的position属性上,并将position加1。如果超出了capacity。如果是读操作就会抛出BufferOverflowException,写操作就会BufferUnderflowException.如果是任意操作,也会抛出一个IndexOutOfBoundsException.

让我们试着对ByteBuffer中写入数据,如果执行下面的命令,JAVA NIO BUFFER(一)中的概念图,就会变成如下的样子。

1
buffer.put((byte)'H').put((byte)'e').put((byte)'l').put((byte)'l').put((byte)'o');

B2CE351E19F8ECB67C2AD0AEE197EE5B.jpg

猜一猜,如果执行这个命令

1
buffer.put(0,(byte)'M').put((byte)'w');

会变成什么样子?

1D7142D68BB086C4A65AD87549C30598.jpg

会变成这样!,第一个把0位置的H替换为M,然后紧接着的put操作查看到了position在5位置,把5位置写入W,然后position自加变为6.

我们准备好了Buffer,现在我们要将它交由Channel处理。现在假设我们填满了一个Buffer,emmmm,然后把它传给Channel对象,Channel对象对buffer调用get方法,然后position自加,会怎么样?IndexOutOfBoundsException被抛出了!

那怎么办?为了让Channel能够处理对象,我们需要把postion重设为你希望它开始处理的位置,顺便再设置limite为现在存储的位置,假设我们需要Channel从头开始处理对象,那么就应该做如下的处理:

1
2
3
buffer.limit(buffer.position()).position(0);
//上面的调用,等同于如下的方法
buffer.flip();

然后概念图就会变成下面这样:
157F5BA61609A874814F433E2D7CDA26.jpg
JAVA NIO BUFFER(一)中提到的rewind()方法,只把position设置为0,并不影响limit的位置。

进行完这个操作之后,你就可以试着读取里面的数据

1
2
3
4
5
6
7
8
9
10
11
//利用hasRemaining()方法判断Buffer中是否还有数据
for(int i=0;buffer.hasRemaining();i++) {
myByteArray[I]=buffer.get();
}
//上面的方法未必高效,循环体中每次循环都要进行一次判断
//你可以使用remaining()方法,返回剩余的数目
int count = buffer.remaining();
for(int i=0;buffer.count;i++) {
myByteArray[I]=buffer.get();
}
//如果你对Buffer有着惊人的控制,那么不进行数据检查将会是最快的

一旦Buffer使用完毕,它就可以进行复用,clear()方法把一个Buffer重置到一个空的状态。它不更改任何数据,把limit设置为capacity,并把position设置为0.至于里面是不是还有数据?这不重要,Buffer仅仅通过position来判断数据的“死活”为什么一定要将数据置空,这些操作难道不需要时间吗?下面的例子使用到了一些buffer的基本操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class BufferFillDrain {
//原代码作者 Ron Hitchens (ron@ronsoft.com)
private static int index = 0;

private static String[] strings = {
"Hello, this is ZhangJian",
"He likes Eminem",
"Put the dick in the dust",
"And? Fuck the world?",
"Lyrics are great",
"But what I like more",
"is the spirit"
};

public static void main(String[] args) throws Exception {
CharBuffer buffer = CharBuffer.allocate(100);
while( fillBuffer(buffer)) {
buffer.flip();
drainBuffer(buffer);
buffer.clear();
}
}

private static void drainBuffer (CharBuffer buffer) {
while (buffer.hasRemaining()) {
System.out.print(buffer.get());
}

System.out.println("");
}

private static boolean fillBuffer(CharBuffer buffer) {
if(index >= strings.length) {
return false;
}
String string = strings[index++];
for(int i=0;i<string.length();i++) {
buffer.put(string.charAt(i));
}
return true;
}

}

有一些情况,你只想把Buffer的一部分输出,然后下次再接着操作,下次操作的时候下标也要从0开始。这么做的话,就必须将未读的数据左移至开头,这样做毫无疑问是低效率的,如果一定要这么做的话,Buffer提供了compact方法

1
buffer.compact();

假设你输出了两个字符那么现在概念图就会变成这样:
48BBB9F226F5DD44A7CE940747B47BDE.jpg

然后你调用了compact方法
4A7013E3CD5086017F254507F2F79C47.jpg

4,5都会保持不变,还是同样的道理,Buffer不会做那些看上去合理但并没有用的操作。

Mark方法,允许buffer记下一个位置,如果reset被调用,那么position会转移到mark的位置。如果在mark还未指定的情况下调用reset,会抛出InvalidMarkException。如果你调用一些buffer方法,mark标记会被丢弃,比如rewind,clear和flip。如果调用带参数的limit或者position方法,如果该参数值在mark标记之前,那么mark也会废弃。

设计buffer的目标是为了有效的数据交换,在循环中依次移动数据非常地没有效率。如下列出了CharBuffer中大量移动数据的API

1
2
3
4
5
6
7
8
9
10
11
public abstract class CharBuffer extends Buffer implements CharSequence,Comparable {
//这只是方法的一部分
public CharBuffer get (char[] dst)
public CharBuffer get(char[] dst, int offset, int length)

public final CharBuffer put (char[] src)
public CharBuffer put(char[] src, int offset, int length)
public CharBuffer put(char[] src)

public final CharBuffer put(String src)
public CharBuffer put(String src, int start, int end)

两种方式从数组中复制数据。第一个方法,只携带一个数组作为参数。第二个携带偏移量和长度两个参数指定子数组。虽然最终这个和循环移动的结果相同,但是这种方式往往更加有效,因为通常这些方法都会优化移动数据或者调用native代码。

如果你要求的数据们无法被转移,那么会抛出BufferUnderflowException.或者你要求的参数,buffer无法全部填充,也会抛出异常。在尝试获取数据之前,你应该先判断一下容量是否充足:

1
2
3
4
5
6
char[] bigArray = new char[1000];
//获取buffer剩余输出
int length = buffer.remaining();
//如果length不够填充整个数组
buffer.get(bigArray,0,length);
processData(bigArray,length);

但是如果buffer持有的数据大于了你的数组,你可以这么做:

1
2
3
4
5
6
7
char[] smallArray = new char[10];

while (buffer.hasRemaining()) {
int length = Math.min(buffer.remaining(), smallArray.length);
buffer.get(smallArray, 0, length);
processData(smallArray, length);
}

buffer的put方法,如果buffer有空间存放这些数据,数据就会从现在buffer的position中开始写入,并且更新buffer的position值。如果空间不足,不会有任何数据写入,并抛出BufferOverflowException。

当然buffer的put方法也可以直接传入一个buffer作为参数,这和下面的操作等价:

1
dstBuffer.put(srcBuffer) 等价于 dstBuffer.put(srcBuffer.get());

如果是以String为参数的put方法,和charArray类似。尽管String并不是char的集合,但我们倾向于把String概念化为char的集合。

JAVA NIO BUFFER(一)看到了七个基本的buffer类,我们以CharBuffer为例。来看看如何创建一个CharBuffer对象:

1
2
3
4
5
6
7
8
9
10
public abstract class CharBuffer extends Buffer implements CharSequence, Comparable {
public static CharBuffer allocate (int capacity)

public static CharBuffer wrap(char[] array)
public static CharBuffer wrap(char[] array, int offset, int length)

public final boolean hasArray()
public final char[] array()
public final int arrayOffset()
}

通过包装或者分配都可以构造一个新的Buffer对象,分配方式创建了一个Buffer对象并且给它分配了私人空间。包装方式创建了一个Buffer对象但是没有给它分配私人空间(就使用你传递给它的数组参数)

1
2
3
4
5
//分配一个CharBuffer可以容纳100个字符
CharBuffer charBuffer = CharBuffer.allocate(100);
//如果你想要使用你自己的数组
char[] myArray = new char[100];
CharBuffer charBuffer = CharBuffer.wrap(myArray);

那么你觉得如下的代码会怎么样呢

1
CharBuffer charBuffer = CharBuffer.wrap(myArray,12,42);

会分配一个大小为30的数组给你操作?

不是,假如myArray的长度是100,CharBuffer还是掌控着长度为100的数组,只是初始的position为12,limit为42.而后面要提到的slice方法可以生成一个只能够操作给定范围的CharBuffer。

通过allocate或者wrap生成的buffer是不直接的。不直接的buffer包含有着数组。hasArray方法告诉你是否存在数组,如果这个方法返回true,那么array方法就会返回给你这个数组的引用。如果返回false,不要调用array或者arrayOffset方法,否则就会抛出UnsupportedOperationException.

如果Buffer是只读的,就算它是通过wrap数组方法生成的,调用它的array或者arrayOffset方法也会抛出ReadOnlyBufferException 防止你通过数组修改只读数据。

通过数组的形式存储数据,可以实现Buffer的功能,进而创建Buffer对象,但是Buffer可不仅仅只能通过数组才能实现。Buffer还可以管理其它buffer的数组。一旦这么做,就是一个view buffer对象。大多数view buffers是bytebuffers的视图。

View Buffers通常通过调用已经存在buffer的方法生成。创建的view buffer不仅可以通过get,put方法操作原有的buffer,而且如果原来的buffer是直接的,view buffer也可以得到同样的性能优势。以CharBuffer为例,查看它的相关方法声明:

1
2
3
4
5
public abstract class CharBuffer extends Buffer implements CharSequence, Comparable {
public abstract CharBuffer duplicate();
public abstract CharBuffer asReadOnlyBuffer();
public abstract CharBuffer slice();
}

duplicate方法创建了一个跟原来相似的新的buffer。两个buffer共享数据,有相同的容量,但是两个buffer独自管理自己的position,limit和mark。对数据的更改会反应在两个buffer之上。

你也可以通过asReadOnlyBuffer来创建一个只读的CharBuffer,大部分和duplicate相同,新的buffer会禁用put方法,而且它的isReadOnly方法会返回true。如果试图破坏这个CharBuffer的只读属性,会抛出ReadOnlyBufferException,值得一提的是,对数据元素的更改也会反应在只读的CharBuffer上。

slice方法跟duplicate方法也很相似,但是slice方法返回的是一部分,下图说明一个原本大小为8的CharBuffer被slice之后,生成的新CharBuffer的属性

1
2
3
CharBuffer buffer = CharBuffer.allocate(8);
buffer.position(3).limit(5);
CharBuffer sliceBuffer = buffer.slice();

3E8A4D8D1FB35C5E069CC57907621DA0.jpg
这些方法都不会对mark属性进行操作。

除了布尔类型,其他基本类型都有自身的Buffer类,但是byteBuffer还有不少其他特性。操作系统和他的IO设备来看,byte是最基本的数据单元。需要把其他类型的数据转化为bytes来操作。为了方便参阅,这里列出来了ByteBuffer的完整API。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public abstract class ByteBuffer extends Buffer implements Comparable {
public static ByteBuffer allocate (int capacity)
public static ByteBuffer allocateDirect (int capacity)
public abstract boolean isDirect( );
public static ByteBuffer wrap (byte[] array, int offset,int length)
public static ByteBuffer wrap (byte[] array)
public abstract ByteBuffer duplicate( );
public abstract ByteBuffer asReadOnlyBuffer( );
public abstract ByteBuffer slice( );
public final boolean hasArray( )
public final byte [] array( )
public final int arrayOffset( )
public abstract byte get( );
public abstract byte get (int index);
public ByteBuffer get (byte[] dst, int offset, int length)
public ByteBuffer get (byte[] dst, int offset, int length)
public abstract ByteBuffer put (byte b);
public abstract ByteBuffer put (int index, byte b);
public ByteBuffer put (ByteBuffer src)
public ByteBuffer put (byte[] src, int offset, int length)
public final ByteBuffer put (byte[] src)
public final ByteOrder order( )
public final ByteBuffer order (ByteOrder bo)
public abstract CharBuffer asCharBuffer( );
public abstract ShortBuffer asShortBuffer( );
public abstract IntBuffer asIntBuffer( );
public abstract LongBuffer asLongBuffer( );
public abstract FloatBuffer asFloatBuffer( );
public abstract DoubleBuffer asDoubleBuffer( );
public abstract char getChar( );
public abstract char getChar (int index);
public abstract ByteBuffer putChar (char value);
public abstract ByteBuffer putChar (int index, char value);
public abstract short getShort( );
public abstract short getShort (int index);
public abstract ByteBuffer putShort (short value);
public abstract ByteBuffer putShort (int index, short value);
public abstract int getInt( );
public abstract int getInt (int index);
public abstract ByteBuffer putInt (int value);
public abstract ByteBuffer putInt (int index, int value);
public abstract long getLong( );
public abstract long getLong (int index);
public abstract ByteBuffer putLong (long value);
public abstract ByteBuffer putLong (int index, long value);
public abstract float getFloat( );
public abstract float getFloat (int index);
public abstract ByteBuffer putFloat (float value);
public abstract ByteBuffer putFloat (int index, float value);
public abstract double getDouble( );
public abstract double getDouble (int index);
public abstract ByteBuffer putDouble (double value);
public abstract ByteBuffer putDouble (int index, double value);
public abstract ByteBuffer compact( );
public boolean equals (Object ob);
public int compareTo (Object ob);
public String toString( );
public int hashCode( );
}

除了boolean类型之外,byte占1个byte,char两个,short两个,int四个,long八个,float四个,double八个。虽然这些字节一定是按照顺序的,但是也有大端和小端之分。(指的是连续的内存单元,先放高字节,还是先存放低字节)在java.nio中,字节顺序由ByteOrder这个类来封装

1
2
3
4
5
6
7
public final class ByteOrder {
public static final ByteOrder BIG_ENDIAN
public static final ByteOrder LITTLE_ENDIAN

public static ByteOrder nativeOrder()
public String toString()
}

这个类定义了两个自己实例的公共域,在JVM中,只有这两个实例,所以要比较的话,可以使用==。如果你需要知道JVM上的字节顺序,调用nativeOrder方法。

每一个buffer类都可以通过order方法知悉自己现在的字节顺序设置

1
public final ByteOrder order()

返回ByteOrder的两个实现之一,但和ByteBuffer不同,这个返回的对象只是可读的。ByteBuffer的默认字节顺序是大根端(无论运行在什么字节顺序的平台上),Java的默认字节顺序也是大根端,允许类文件,序列化的对象运行在任何JVM上。如果本地的硬件设备是小根端的话,这可能会有着性能的影响。把ByteBuffer作为其他数据类型接受的时候,使用本地的字节顺序会更有效。

ByteBuffer的字节顺序可以通过order方法改变

1
2
public final ByteOrder order()
public final ByteOrder order(ByteOrder bo)

如果buffer被创建为ByteBuffer的一个视图,那么order返回的就是当这个buffer创建时,原来的buffer的字节顺序,即使原来的buffer改变字节顺序,buffer的字节顺序也不会改变。

ByteBuffer和其他Buffer不同的是,它们可以作为Channel(通道)操作的起点或者终点。通道只接受ByteBuffer作为参数。

操作系统在内存区中进行IO操作,这些内存区域就是连续的byte。操作系统会直接进入进程的地址空间来转移数据。也就是说内存区的数据最好是连续的字节数。但是在JVM中,字节数组并不一定存储在连续的内存区域,GC可能会移动它们。如何存储数组,根据JVM的实现还有很大的区别。

因为这个原因,引入了direct buffer这一概念,direct buffer来处理通道和本地IO操作。尽最大努力把byte数据存储在一个channel可以直接使用的或者可以直接由本地方法通知操作系统,操作系统直接操作的内存区域。

这往往是IO最高效的选择,支持JVM能支持的最高效IO机制。非直接的Buffer也可以传递给channel,但是会导致性能的损失。通常情况下,非直接的buffer是不可能作为本地IO操作的目标的。如果你将一个非直接buffer传递给channel操作,channel可能会做如下的操作
1.创建一个临时的direct ByteBuffer对象
2.把非直接的ByteBuffer中的内容拷贝到1创建的对象中
3.利用临时对象进行低等级的IO操作
4.临时对象使用完毕,等待GC回收

创建DirectBuffer的代价可能会更高,DirectBuffer使用的内存是越过JVM,直接由本地代码分配的。而且DirectBuffer使用的内存无法被垃圾回收,因为它们在JVM堆之外。

1
2
3
4
5
public abstract class ByteBuffer extends Buffer implements Comparable {
public static ByteBuffer allocate(int capacity)
public static ByteBuffer allocateDirect(int capacity)
public abstract boolean isDirect();
}

调用allocateDirect方法来创建一个DirectBuffer。那些根据wrap方法创建的buffer,总是非直接的(non-direct)。

Channel

Channels是java.nio的第二个主要创新,提供了跟IO服务的直接连接。Channel是bytebuffer,文件,或者socket之间传输数据的导管。Channel提供了平台无关的抽象,但仍然可以比拟现代操作系统上native代码的IO能力。

1
2
3
4
public interface Channel {
public boolean isOpen();
public void close() throws IOException;
}

跟buffer不同,channel的实现在不同的操作系统上差距很大,所以channel只定义为一个接口,描述它可以做什么。通常情况下用native方法实现。只有两个方法,一个用来判断通道是否处于打开状态,一个用于关闭通道。

InterruptibleChannel是一个标记接口,代表这个通道是可中断的,可中断的通道在运行它们的线程中断时有特殊的特性。

IO分为两个大类,文件IO和流IO。通道也分为,文件通道和socket通道。Socket通道有工厂方法可以直接创建。但是一个文件通道不能够直接创建,只能够通过调用RandomAccessFile,FileInputStream,FileOutputStream的getChannel方法。你不能够直接创建一个FileChannel对象

1
2
3
4
5
6
7
8
9
10
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("some host",someport));

ServerSocketChannel ssc = new ServerSocketChannel().open();
ssc.socket().bind(new InetSocketAddress(somelocalport));

DatagramChannel dc = DatagramChannel.open();

RandomAccessFile raf = new RandomAccessFile("somefile","r");
FileChannel fc = raf.getChannel();

先看如下的API

1
2
3
4
5
6
7
8
public interface ReadableByteChannel extends Channel {
public int read (ByteBuffer dst) throws IOException;
}
public interface WritableByteChannel extends Channel {
public int write(ByteBuffer src) throws IOException;
}
public interface ByteChannel extends ReadableByteChannel, WritableByteChannel {
}

通道可以是双向的也可以是单向的。根据需要实现这三个接口中的一个。

通道的read(),write()方法都以ByteBuffer作为参数,每个都返回操作了的字节数,并更新ByteBuffer中的position属性。
下面的例子展示了使用channel进行putget的基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static final int BUFFER_SIZE = 16*1024;

public static void main(String[] args) throws IOException {

ReadableByteChannel source = Channels.newChannel(System.in);
WritableByteChannel dest = Channels.newChannel(System.out);
//call one of copy method
source.close();
dest.close();
}

public static void channelCopy1 (ReadableByteChannel src, WritableByteChannel dest) throws IOException{
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
while (src.read(buffer)!=-1) {
//准备好数据
buffer.flip();

//向通道中写入数据,可能阻塞
dest.write(buffer);

//调用compact方法,可以确保数据都被dest处理
buffer.compact();
}
//还有可能有数据剩余,此时只是没有可读数据了而已
while (buffer.hasRemaining()) {
dest.write(buffer);
}
buffer.clear();
}

public static void channelCopy2 (ReadableByteChannel src, WritableByteChannel dest) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
while (src.read(buffer)!=-1) {
buffer.flip();
while (buffer.hasRemaining()) {
dest.write(buffer);
}
buffer.clear();
}
}

通道可以工作在阻塞或者非阻塞模式下。如果工作在非阻塞模式下,调用它的线程永远不会sleep。请求操作要么立刻完成,要么返回结果表明没有进行任何操作。只有面向流的通道,像socket或者pipe才可以工作在非阻塞模式。

跟buffer不同,channel不可以被复用。一个打开的通道代表着跟特定IO服务之间的连接,还封装着连接的状态。当通道关闭的时候,连接丢失。调用通道的close方法会导致当关闭IO服务的时候,线程短暂地阻塞,就算工作在非阻塞模式也一样。对同一个通道调用多次close方法是无害的。如果第一个线程在close方法中阻塞,其他调用close方法的线程也会阻塞等待。后面的close方法什么都不做,然后立刻返回。

通道有一些和关闭中断相关的特性。如果一个通道实现了InterruptibleChannel接口。那么,如果线程正在被通道所阻塞,然后线程被中断,通道会被关闭,然后线程会收到一个ClosedByInterruptException异常。另外,如果线程被设置为中断状态,然后线程尝试获取一个通道,通道就会立刻关闭,抛出同样的异常。

先看如下的API

1
2
3
4
5
6
7
8
public interface ReadableByteChannel extends Channel {
public int read (ByteBuffer dst) throws IOException;
}
public interface WritableByteChannel extends Channel {
public int write(ByteBuffer src) throws IOException;
}
public interface ByteChannel extends ReadableByteChannel, WritableByteChannel {
}

通道可以是双向的也可以是单向的。根据需要实现这三个接口中的一个。

通道的read(),write()方法都以ByteBuffer作为参数,每个都返回操作了的字节数,并更新ByteBuffer中的position属性。
下面的例子展示了使用channel进行putget的基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static final int BUFFER_SIZE = 16*1024;

public static void main(String[] args) throws IOException {

ReadableByteChannel source = Channels.newChannel(System.in);
WritableByteChannel dest = Channels.newChannel(System.out);
//call one of copy method
source.close();
dest.close();
}

public static void channelCopy1 (ReadableByteChannel src, WritableByteChannel dest) throws IOException{
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
while (src.read(buffer)!=-1) {
//准备好数据
buffer.flip();

//向通道中写入数据,可能阻塞
dest.write(buffer);

//调用compact方法,可以确保数据都被dest处理
buffer.compact();
}
//还有可能有数据剩余,此时只是没有可读数据了而已
while (buffer.hasRemaining()) {
dest.write(buffer);
}
buffer.clear();
}

public static void channelCopy2 (ReadableByteChannel src, WritableByteChannel dest) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
while (src.read(buffer)!=-1) {
buffer.flip();
while (buffer.hasRemaining()) {
dest.write(buffer);
}
buffer.clear();
}
}

通道可以工作在阻塞或者非阻塞模式下。如果工作在非阻塞模式下,调用它的线程永远不会sleep。请求操作要么立刻完成,要么返回结果表明没有进行任何操作。只有面向流的通道,像socket或者pipe才可以工作在非阻塞模式。

跟buffer不同,channel不可以被复用。一个打开的通道代表着跟特定IO服务之间的连接,还封装着连接的状态。当通道关闭的时候,连接丢失。调用通道的close方法会导致当关闭IO服务的时候,线程短暂地阻塞,就算工作在非阻塞模式也一样。对同一个通道调用多次close方法是无害的。如果第一个线程在close方法中阻塞,其他调用close方法的线程也会阻塞等待。后面的close方法什么都不做,然后立刻返回。

通道有一些和关闭中断相关的特性。如果一个通道实现了InterruptibleChannel接口。那么,如果线程正在被通道所阻塞,然后线程被中断,通道会被关闭,然后线程会收到一个ClosedByInterruptException异常。另外,如果线程被设置为中断状态,然后线程尝试获取一个通道,通道就会立刻关闭,抛出同样的异常。

通道提供了分散聚合的能力。就是说一次IO操作可以对应多个buffer。

对于写操作(向通道中写入数据),数据从数个buffer中汇合然后沿通道发送
对于读操作(从通道中读出数据),从通道中出来的数据分散到许多不同的buffer,尽可能地读取,直到数据或者buffer的可用空间被耗尽。

许多现代操作系统支持native vectored(矢量) IO;当你在一个通道上发起一个分散聚合请求时,请求会被解释成native方法。这样子buffer的复制和系统调用可以减少甚至消除。最好使用direct buffer,得到最优的性能。

1
2
3
4
5
6
7
8
9
public interface ScatteringByteChannel extends ReadableByteChannel {
public long read(ByteBuffer[] dsts) throws IOException;
public long read(ByteBuffer[] dots,int offset, int length) throws IOException;
}

public interface GatheringByteChannel extends WritableByteChannel {
public long write(ByteBuffer[] srcs) throws IOException;
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
}

看看下面的代码段
分散聚合类图

1
2
3
4
ByteBuffer header = ByteBuffer.allocateDirect(10);
ByteBuffer body = ByteBuffer.allocateDirect(80);
ByteBuffer[] buffers = {header, body};
int bytesRead = channel.read(buffers);

如果返回值bytesRead为48,那么head中拥有最先的10个数据,body中有38个。

相似的,我们可以组装数据在不同的buffer中,然后发送聚合

1
2
3
4
5
body.clear();
body.put("FOO".getBytes()).flip();
header.clear();
header.putShort(TYPE_FILE).putLong(body.limit()).flip();
long bytesWritten = channel.write(buffers);

FileChannel继承了ScatteringByteChannel,ByteChannel和GatheringByteChannel。在类中还引入了文件锁。下面是部分的API

1
2
3
4
5
6
7
8
9
10
public class FileLock implements AutoClosable{
public FileChannel channel()
public long position()
public long size()
public boolean isShared()
public boolean overlaps(long position, long size)
public boolean isValid()
public void release()
public String toString()
}

89EFD771-3B57-4FFF-A6D9-B48DC66E6F85.png
文件通道总是运行在阻塞模式而且不能在非阻塞模式下工作。面向流的非阻塞模式对文件操作来说意义不大。对于文件来说,真正需要的是异步IO,从操作系统发出一条或多条IO请求而不需要等待它们完成,而是在完成之后收到通知。

FileChannel对象可以通过调用RandomAccessFile,FileInputStream,FileOutputStream的getChannel方法得到,调用这个方法得到一个FileChannel对象,拥有和File对象一样的访问权限。

FileChannel是一个抽象类,你通过getChannel方法得到的是一个具体实现,往往实现中有着大量native代码。FileChannel是线程安全的。多线程可以同时调用他们的方法而不会引起任何问题,但不是所有操作都是多线程的,影响到通道的位置和文件大小是单线程的。如果有线程试图做这样的操作,那么它必须等待另一个线程执行完毕。

FileChannel保证同一个JVM中的所有实例看到的文件视图都是一致的。但这个视图和非JVM线程看到的可能相同也可能不同,这高度取决于底层的操作系统和文件系统。一般情况下来说,都是相同的。

FileChannel中的position,代表着数据下一个写入或者读取的地方,无参数的position方法,返回一个long类型的当前位置。第二个需要一个long类型的参数,如果你把参数设置为负,会抛出IllegalArgumentException,但是如果设置的超出了文件的大小不会抛出异常,但不会更改文件的大小。如果读操作超出了文件的大小,会返回EOF,如果是写操作,就会更改文件的大小。每个FileChannel对象都和一个File descriptor有着一对一的联系。而File descriptor是channel创建的时候被共享的。也就是说他们可以看到彼此对position的更改,如下例所示:

1
2
3
4
5
6
7
8
9
10
RandomAccessFile randomAccessFile = new RandomAccessFile("filename","r");
//设置position
randomAccessFile.seek(1000);
//创建一个channel
FielChannel fileChannel = randomAccessFile.getChannel();
System.out.println(fileChannel.position());//打印1000
randomAccessFile.seek(500);
System.out.println(fileChannel.position());//500
fileChannel.position(200);
System.out.println(randomAccessFile.getFilePointer());//200

和buffer相似,FileChannel的read方法和write方法都可以携带一个position参数,这样子比起不带参数的read,write方法更为高效,因为不需要更新channel的状态,可以直接通过native code实现。最好的是,多线程可以同时访问一个文件而不需要相互打扰。

如果视图读取文件的结尾,会收到EOF,如果在这时,往超出文件大小的地方写入,文件就会自动增长,但在这EOF和此时写入的位置之间无数据的地方就要看具体的操作系统和文件系统了,可能会导致一个file hole。当你觉得有必要调整文件的大小时,调用truncate方法,携带一个long类型作为参数,如果大于你此时文件的大小,无事发生过。但是如果小于,大于这个数值的数据会被丢弃。

最后force方法,要求强制把现在已经做了的更改应用到磁盘文件上,参数表明是否更新元数据(如文件所有者,访问权限,最后一次修改时间等)

文件锁FileLock

文件锁可以是shared(共享锁)或者exclusive(排他锁)。不是所有的平台都以同一种方式实现文件锁,不同的操作系统可能不同,同一操作系统上的不同文件系统也可能不同。有些操作系统只提供协同锁,有些只提供强制锁,有些则都提供。

文件锁是以文件为单位的,不是以通道,也不是线程。所以文件锁不适合同一个多个线程访问的情形。如果一个线程获得了给定文件的排他锁,第二个线程请求打开了一个新的channel,请求获得排他锁,请求会被批准。但如果这两个线程运行在不同的JVM中,第二个线程会阻塞,因为锁往往是根据进程来进行裁决,而不是线程。锁工作于一个文件,而不是单独的文件处理器或是通道。
/*
如果你需要控制多个线程之间的同步,你可能需要实现自己的轻量级的锁,内存映射文件可能是个适合的选择
*/

1
2
3
4
5
6
7
8
public abstract class FileChannel extends AbstractChannel implements ByteChannel, GatheringByteChannel, ScatteringByteChannel {

public final FileLock lock()
public abstract FileLock lock (long position, long size, boolean shared)

public final FileLock tryLock()
public abstract FileLock tryLock(long position, long size, boolean shared)
}

先看带参数的lock方法,获得给定区域的锁,自position开始,size大小,第三个布尔参数代表是锁是否共享。锁的区域并不受到文件大小的限制,锁可以超过文件的大小,也就是说在一段区域被写入数据之前锁住,是可行的。相反的,如果文件的大小超出了锁的限制,也就将不受到锁的限制。不带参数的lock方法,等效于
fileChannel.lock(0L,Long.MAX_VALUE, false);
如果你的请求是有效的,那么lock方法就会生效,但是要等待前一个锁(如果存在的话)释放。

tryLock方法是lock方法非阻塞的变种,功能和lock相似,但是如果不能立刻获得锁的话,tryLock会返回null。从创建开始,直到调用FileLock的release方法,FileLock对象都是有效的。可以通过isValid方法测试。一个锁是否有效可能会改变,但锁的位置,大小,是否共享,是不变的。

你可以通过isShared判断锁是否为共享锁,如果内在的文件系统操作系统不支持共享,那么这个方法总是会返回false,就算你传递true作为构造函数也一样。FileLock是线程安全的,多个线程可以通过一个FileLock进行操作。尽管FileLock对象和一个Channel相关,但是其实锁是和内在的文件联系的。这有可能造成冲突,也有可能死锁,如果你完成了操作而没有释放锁的话。一个典型的代码如下所示:

1
2
3
4
5
6
7
8
FileLock lock = fileChannel.lock();
try{
<perform read/write/whatever on channel>
} catch (IOException e) {
<handle unexcepted exception>
} finally {
lock.release();
}

下面是一个使用FileLock进行操作的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

private static final int SIZEOF_INT = 4;
private static final int INDEX_START = 0;
private static final int INDEX_COUNT = 10;
private static final int INDEX_SIZE = INDEX_COUNT * SIZEOF_INT;

private ByteBuffer buffer = ByteBuffer.allocate(INDEX_SIZE);
private IntBuffer indexBuffer = buffer.asIntBuffer();
private Random rand = new Random();

public static void main(String[] args) throws Exception{
boolean writer = false;
String filename;
//决定你所做的操作,读或者写
if(args.length!=2) {
System.out.println("Usage: [-r|-w] filename");
return;
}
writer = args[0].equals("-w");//true写false读
filename = args[1];
RandomAccessFile raf = new RandomAccessFile(filename,writer?"rw":"r");
FileChannel fc = raf.getChannel();//通过RandomAccessFile拿到fileChannel
LockTest lockTest = new LockTest();
if(writer) {
lockTest.doUpdates(fc);
} else {
lockTest.doQueries(fc);
}
}

void doQueries (FileChannel fc) throws Exception {
//如果是单次操作的话,没有这个循环,这里使用这个循环,为了多次
//运行程序,发现锁的工作原理
while (true) {
FileLock lock = fc.lock(INDEX_START,INDEX_SIZE,true);
int reps = rand.nextInt(60) + 20;
for(int i=0; i<reps; i++) {
int n = rand.nextInt(INDEX_COUNT);
int position = INDEX_START + (n*SIZEOF_INT);
buffer.clear();
fc.read(buffer,position);
int value = indexBuffer.get(n);
Thread.sleep(100);//doing some work
}
lock.release();
Thread.sleep(rand.nextInt(3000)+500);
}
}

void doUpdates (FileChannel fc) throws Exception {
while (true) {
FileLock lock = fc.lock(INDEX_START,INDEX_SIZE,false);
updateIndex(fc);
lock.release();
Thread.sleep(rand.nextInt(2000)+500);
}
}

private int idxval = 1;

private void updateIndex (FileChannel fc) throws Exception{
indexBuffer.clear();
for(int i=0; i<INDEX_COUNT; i++) {
idxval++;
indexBuffer.put(idxval);
Thread.sleep(500);
}
buffer.clear();
fc.write(buffer,INDEX_START);
}


}
0%