Skip to content

Commit

Permalink
get tweet trends filter by timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
zmyzheng committed Feb 24, 2020
1 parent 13a0dbb commit 53166a7
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TrendResponse {
public class Trend {

private String tagName;
private int frequency;
private long frequency;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.zmyzheng.restapi.controller;

import io.zmyzheng.restapi.api.model.Trend;
import io.zmyzheng.restapi.api.model.TrendRequest;
import io.zmyzheng.restapi.domain.Tweet;
import io.zmyzheng.restapi.service.TweetService;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -31,5 +33,10 @@ public List<Tweet> getTweets(@RequestParam long timeFrom, @RequestParam long tim
return this.tweetService.getTweets(timeFrom, timeTo);
}

@PostMapping("/trends/search")
@ResponseStatus(HttpStatus.OK)
public List<Trend> searchTrends(@RequestBody TrendRequest trendRequest) {
return this.tweetService.queryTrends(trendRequest);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.zmyzheng.restapi.repository;

import io.zmyzheng.restapi.api.model.TrendRequest;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.ResultsExtractor;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.stereotype.Repository;

import java.util.List;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.range;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;

/**
* @Author: Mingyang Zheng
* @Date: 2020-02-23 14:09
*
* @Description: this class defines operations that are not covered in standard Spring Data Repositories
*/

@Slf4j
@Repository
public class EsOperationRepository {

private final ElasticsearchRestTemplate elasticsearchRestTemplate;

public EsOperationRepository(ElasticsearchRestTemplate elasticsearchRestTemplate) {
this.elasticsearchRestTemplate = elasticsearchRestTemplate;
}

public List<? extends Terms.Bucket> aggregateByField(TrendRequest trendRequest, String field) {
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(rangeQuery("timestamp").gte(trendRequest.getTimeFrom()).lte(trendRequest.getTimeTo()))
.withIndices("streaming").withTypes("tweets")
.addAggregation(terms(field).field(field).size(trendRequest.getTopN()))
.build();
List<? extends Terms.Bucket> buckets = elasticsearchRestTemplate.query(searchQuery, new ResultsExtractor<List<? extends Terms.Bucket>>() {
@Override
public List<? extends Terms.Bucket> extract(SearchResponse response) {
return ((Terms) response.getAggregations().get(field)).getBuckets();
}
});
return buckets;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.zmyzheng.restapi.service;

import io.zmyzheng.restapi.api.model.Trend;
import io.zmyzheng.restapi.api.model.TrendRequest;
import io.zmyzheng.restapi.domain.Tweet;

import java.util.List;
Expand All @@ -10,4 +12,6 @@
*/
public interface TweetService {
List<Tweet> getTweets(long timeFrom, long timeTo);

List<Trend> queryTrends(TrendRequest trendRequest);
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package io.zmyzheng.restapi.service;

import io.zmyzheng.restapi.api.model.Trend;
import io.zmyzheng.restapi.api.model.TrendRequest;
import io.zmyzheng.restapi.domain.Tweet;
import io.zmyzheng.restapi.repository.EsOperationRepository;
import io.zmyzheng.restapi.repository.TweetRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.stream.Collectors;

/**
* @Author: Mingyang Zheng
Expand All @@ -16,13 +20,23 @@
public class TweetServiceImpl implements TweetService {

private final TweetRepository tweetRepository;
private final EsOperationRepository esOperationRepository;

public TweetServiceImpl(TweetRepository tweetRepository) {
public TweetServiceImpl(TweetRepository tweetRepository, EsOperationRepository esOperationRepository) {
this.tweetRepository = tweetRepository;
this.esOperationRepository = esOperationRepository;
}

@Override
public List<Tweet> getTweets(long timeFrom, long timeTo) {
return this.tweetRepository.findByTimestampBetween(timeFrom, timeTo);
}

@Override
public List<Trend> queryTrends(TrendRequest trendRequest) {
return this.esOperationRepository.aggregateByField(trendRequest, "hashTags")
.stream()
.map(bucket -> new Trend(bucket.getKeyAsString(), bucket.getDocCount()))
.collect(Collectors.toList());
}
}

0 comments on commit 53166a7

Please sign in to comment.