Skip to content
This repository has been archived by the owner on Jun 16, 2023. It is now read-only.

Commit

Permalink
#Release 0.9.4.1
Browse files Browse the repository at this point in the history
##Bug fix:
1. Improve speed between tasks who is running in one worker
2. Fix wrong timeout seconds
3. Add checking port when worker initialize and begin to kill old worker
4. Move worker hearbeat thread before initializing tasks
5. Move init netty-server before initializeing tasks
6. Check whether tuple's rootId is duplicated
7. Add default value into Utils.getInt
8. Add result function in ReconnectRunnable
9. Add operation to start Timetick
10. Halt process when master nimbus lost ZK node
11. Add exception catch when cgroups kill process
12. Speed up  reconnect to netty-server
13. Share one task hearbeat thread for all tasks
14. Quickly haltprocess when initialization failed.
15. Check web-ui logview page size
  • Loading branch information
bastiliu committed Aug 18, 2014
1 parent 62f17ba commit 1e8dacd
Show file tree
Hide file tree
Showing 41 changed files with 588 additions and 359 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ English Document and tutorials can be found on the [English JStorm Documentation


# Download
[JStrom 0.9.4](http://42.121.19.155/jstorm/jstorm-0.9.4.zip)
[JStrom 0.9.4.1](http://42.121.19.155/jstorm/jstorm-0.9.4.1.zip)

[JStrom 0.9.4.0](http://42.121.19.155/jstorm/jstorm-0.9.4.zip)

[JStrom 0.9.3.1](http://42.121.19.155/jstorm/jstorm-0.9.3.1.zip)

Expand All @@ -29,12 +31,12 @@ English Document and tutorials can be found on the [English JStorm Documentation
# Contributor
封仲淹([@longdafeng](https://github.com/longdafeng))<br/>
陈昱([@cycyyy](https://github.com/cycyyy))<br/>
李鑫([@tumen](https://github.com/tumen))<br/>
母延年([@muyannian](https://github.com/muyannian))<br/>
周鑫([@zhouxinxust](https://github.com/zhouxinxust))<br/>
罗实([@luoshi0801](https://github.com/luoshi0801))<br/>
刘键([@bastiliu](https://github.com/bastiliu))<br/>
方孝健([@hustfxj](https://github.com/hustfxj))<br/>
李鑫([@tumen](https://github.com/tumen))<br/>
母延年([@muyannian](https://github.com/muyannian))<br/>
周鑫(@[@zhouxinxust](https://github.com/zhouxinxust))<br/>
罗实([@luoshi0801](https://github.com/luoshi0801))<br/>


# Getting help
Expand Down
2 changes: 1 addition & 1 deletion example/sequence-split-merge/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jstorm.version>0.9.4</jstorm.version>
<jstorm.version>0.9.4.1</jstorm.version>
<storm.version>storm-0.9.2-incubating</storm.version>
</properties>
<repositories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.alibaba.jstorm.batch.BatchTopologyBuilder;
import com.alibaba.jstorm.cluster.StormConfig;
import com.alibaba.jstorm.local.LocalCluster;
import com.alibaba.jstorm.utils.JStormUtils;

public class SimpleBatchTopology {

Expand Down Expand Up @@ -52,11 +53,14 @@ private static void LoadYaml(String confPath) {
public static TopologyBuilder SetBuilder() {
BatchTopologyBuilder topologyBuilder = new BatchTopologyBuilder(
topologyName);

int spoutParallel = JStormUtils.parseInt(conf.get("topology.spout.parallel"), 1);

BoltDeclarer boltDeclarer = topologyBuilder.setSpout("Spout",
new SimpleSpout(), 3);
new SimpleSpout(), spoutParallel);

topologyBuilder.setBolt("Bolt", new SimpleBolt(), 1).shuffleGrouping(
int boltParallel = JStormUtils.parseInt(conf.get("topology.bolt.parallel"), 2);
topologyBuilder.setBolt("Bolt", new SimpleBolt(), boltParallel).shuffleGrouping(
"Spout");

return topologyBuilder.getTopologyBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

import com.alibaba.jstorm.utils.JStormUtils;
import com.alipay.dw.jstorm.example.TpsCounter;
import com.alipay.dw.jstorm.example.sequence.bean.TradeCustomer;

Expand All @@ -22,6 +23,8 @@ public class TotalCount implements IRichBolt {
private TpsCounter tpsCounter;
private long lastTupleId = -1;

private boolean checkTupleId = false;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
Expand All @@ -31,6 +34,8 @@ public void prepare(Map stormConf, TopologyContext context,
tpsCounter = new TpsCounter(context.getThisComponentId() +
":" + context.getThisTaskId());

checkTupleId = JStormUtils.parseBoolean(stormConf.get("bolt.check.tupleId"), false);

LOG.info("Finished preparation");
}

Expand All @@ -39,13 +44,15 @@ public void prepare(Map stormConf, TopologyContext context,

@Override
public void execute(Tuple input) {

Long tupleId = input.getLong(0);
if (tupleId <= lastTupleId) {
LOG.error("LastTupleId is " + lastTupleId + ", but now:" + tupleId);
}
lastTupleId = tupleId;


if (checkTupleId) {
Long tupleId = input.getLong(0);
if (tupleId <= lastTupleId) {
LOG.error("LastTupleId is " + lastTupleId + ", but now:" + tupleId);
}
lastTupleId = tupleId;
}

TradeCustomer tradeCustomer = (TradeCustomer) input.getValue(1);

tradeSum.addAndGet(tradeCustomer.getTrade().getValue());
Expand Down
Loading

0 comments on commit 1e8dacd

Please sign in to comment.